diff --git a/crates/vsock/src/main.rs b/crates/vsock/src/main.rs index 4ff0f933..79f11baa 100644 --- a/crates/vsock/src/main.rs +++ b/crates/vsock/src/main.rs @@ -195,10 +195,6 @@ pub(crate) fn start_backend_server( VhostUserVsockBackend::new(config.clone(), cid_map.clone()) .map_err(BackendError::CouldNotCreateBackend)?, ); - cid_map - .write() - .unwrap() - .insert(config.get_guest_cid(), backend.clone()); let listener = Listener::new(config.get_socket_path(), true).unwrap(); @@ -236,7 +232,6 @@ pub(crate) fn start_backend_server( // No matter the result, we need to shut down the worker thread. backend.exit_event.write(1).unwrap(); - cid_map.write().unwrap().remove(&config.get_guest_cid()); } } @@ -455,8 +450,7 @@ mod tests { let cid_map: Arc> = Arc::new(RwLock::new(HashMap::new())); - let backend = Arc::new(VhostUserVsockBackend::new(config, cid_map.clone()).unwrap()); - cid_map.write().unwrap().insert(CID, backend.clone()); + let backend = Arc::new(VhostUserVsockBackend::new(config, cid_map).unwrap()); let daemon = VhostUserDaemon::new( String::from("vhost-user-vsock"), diff --git a/crates/vsock/src/thread_backend.rs b/crates/vsock/src/thread_backend.rs index 50294655..5677ec5b 100644 --- a/crates/vsock/src/thread_backend.rs +++ b/crates/vsock/src/thread_backend.rs @@ -23,6 +23,8 @@ use crate::{ vsock_conn::*, }; +pub(crate) type RawPktsQ = VecDeque; + pub(crate) struct RawVsockPacket { pub header: [u8; PKT_HEADER_SIZE], pub data: Vec, @@ -65,9 +67,9 @@ pub(crate) struct VsockThreadBackend { pub local_port_set: HashSet, tx_buffer_size: u32, /// Maps the guest CID to the corresponding backend. Used for sibling VM communication. - cid_map: Arc>, + pub cid_map: Arc>, /// Queue of raw vsock packets recieved from sibling VMs to be sent to the guest. - raw_pkts_queue: VecDeque, + pub raw_pkts_queue: Arc>, } impl VsockThreadBackend { @@ -92,7 +94,7 @@ impl VsockThreadBackend { local_port_set: HashSet::new(), tx_buffer_size, cid_map, - raw_pkts_queue: VecDeque::new(), + raw_pkts_queue: Arc::new(RwLock::new(VecDeque::new())), } } @@ -103,7 +105,7 @@ impl VsockThreadBackend { /// Checks if there are pending raw vsock packets to be sent to the guest. pub fn pending_raw_pkts(&self) -> bool { - !self.raw_pkts_queue.is_empty() + !self.raw_pkts_queue.read().unwrap().is_empty() } /// Deliver a vsock packet to the guest vsock driver. @@ -178,14 +180,13 @@ impl VsockThreadBackend { if dst_cid != VSOCK_HOST_CID { let cid_map = self.cid_map.read().unwrap(); if cid_map.contains_key(&dst_cid) { - let sibling_backend = cid_map.get(&dst_cid).unwrap(); - let mut sibling_backend_thread = sibling_backend.threads[0].lock().unwrap(); + let (sibling_raw_pkts_queue, sibling_event_fd) = cid_map.get(&dst_cid).unwrap(); - sibling_backend_thread - .thread_backend - .raw_pkts_queue + sibling_raw_pkts_queue + .write() + .unwrap() .push_back(RawVsockPacket::from_vsock_packet(pkt)?); - let _ = sibling_backend_thread.sibling_event_fd.write(1); + let _ = sibling_event_fd.write(1); } else { warn!("vsock: dropping packet for unknown cid: {:?}", dst_cid); } @@ -254,6 +255,8 @@ impl VsockThreadBackend { pub fn recv_raw_pkt(&mut self, pkt: &mut VsockPacket) -> Result<()> { let raw_vsock_pkt = self .raw_pkts_queue + .write() + .unwrap() .pop_front() .ok_or(Error::EmptyRawPktsQueue)?; @@ -436,10 +439,6 @@ mod tests { let sibling_backend = Arc::new(VhostUserVsockBackend::new(sibling_config, cid_map.clone()).unwrap()); - cid_map - .write() - .unwrap() - .insert(SIBLING_CID, sibling_backend.clone()); let epoll_fd = epoll::create(false).unwrap(); let mut vtp = diff --git a/crates/vsock/src/vhu_vsock.rs b/crates/vsock/src/vhu_vsock.rs index 7e2e9474..bb59e052 100644 --- a/crates/vsock/src/vhu_vsock.rs +++ b/crates/vsock/src/vhu_vsock.rs @@ -21,9 +21,10 @@ use vmm_sys_util::{ eventfd::{EventFd, EFD_NONBLOCK}, }; +use crate::thread_backend::RawPktsQ; use crate::vhu_vsock_thread::*; -pub(crate) type CidMap = HashMap>; +pub(crate) type CidMap = HashMap>, EventFd)>; const NUM_QUEUES: usize = 2; const QUEUE_SIZE: usize = 256; @@ -316,7 +317,7 @@ impl VhostUserBackend for VhostUserVsockBackend { } } - if device_event != EVT_QUEUE_EVENT && thread.thread_backend.pending_rx() { + if device_event != EVT_QUEUE_EVENT { thread.process_rx(vring_rx, evt_idx)?; } diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index a073abd7..726e2b64 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -68,6 +68,9 @@ pub(crate) struct VhostUserVsockThread { /// EventFd to notify this thread for custom events. Currently used to notify /// this thread to process raw vsock packets sent from a sibling VM. pub sibling_event_fd: EventFd, + /// Keeps track of which RX queue was processed first in the last iteration. + /// Used to alternate between the RX queues to prevent the starvation of one by the other. + last_processed: RxQueueType, } impl VhostUserVsockThread { @@ -92,21 +95,31 @@ impl VhostUserVsockThread { let sibling_event_fd = EventFd::new(EFD_NONBLOCK).map_err(Error::EventFdCreate)?; + let thread_backend = VsockThreadBackend::new( + uds_path.clone(), + epoll_fd, + guest_cid, + tx_buffer_size, + cid_map.clone(), + ); + + cid_map.write().unwrap().insert( + guest_cid, + ( + thread_backend.raw_pkts_queue.clone(), + sibling_event_fd.try_clone().unwrap(), + ), + ); + let thread = VhostUserVsockThread { mem: None, event_idx: false, host_sock: host_sock.as_raw_fd(), - host_sock_path: uds_path.clone(), + host_sock_path: uds_path, host_listener: host_sock, vring_worker: None, epoll_file, - thread_backend: VsockThreadBackend::new( - uds_path, - epoll_fd, - guest_cid, - tx_buffer_size, - cid_map, - ), + thread_backend, guest_cid, pool: ThreadPoolBuilder::new() .pool_size(1) @@ -115,6 +128,7 @@ impl VhostUserVsockThread { local_port: Wrapping(0), tx_buffer_size, sibling_event_fd, + last_processed: RxQueueType::Standard, }; VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?; @@ -517,7 +531,7 @@ impl VhostUserVsockThread { } /// Wrapper to process rx queue based on whether event idx is enabled or not. - pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { + fn process_unix_sockets(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { if event_idx { // To properly handle EVENT_IDX we need to keep calling // process_rx_queue until it stops finding new requests @@ -540,6 +554,50 @@ impl VhostUserVsockThread { Ok(false) } + /// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not. + pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { + if event_idx { + loop { + if !self.thread_backend.pending_raw_pkts() { + break; + } + vring.disable_notification().unwrap(); + + self.process_rx_queue(vring, RxQueueType::RawPkts)?; + if !vring.enable_notification().unwrap() { + break; + } + } + } else { + self.process_rx_queue(vring, RxQueueType::RawPkts)?; + } + Ok(false) + } + + pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { + match self.last_processed { + RxQueueType::Standard => { + if self.thread_backend.pending_raw_pkts() { + self.process_raw_pkts(vring, event_idx)?; + self.last_processed = RxQueueType::RawPkts; + } + if self.thread_backend.pending_rx() { + self.process_unix_sockets(vring, event_idx)?; + } + } + RxQueueType::RawPkts => { + if self.thread_backend.pending_rx() { + self.process_unix_sockets(vring, event_idx)?; + self.last_processed = RxQueueType::Standard; + } + if self.thread_backend.pending_raw_pkts() { + self.process_raw_pkts(vring, event_idx)?; + } + } + } + Ok(false) + } + /// Process tx queue and send requests to the backend for processing. fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result { let mut used_any = false; @@ -635,31 +693,16 @@ impl VhostUserVsockThread { } Ok(false) } - - /// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not. - pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { - if event_idx { - loop { - if !self.thread_backend.pending_raw_pkts() { - break; - } - vring.disable_notification().unwrap(); - - self.process_rx_queue(vring, RxQueueType::RawPkts)?; - if !vring.enable_notification().unwrap() { - break; - } - } - } else { - self.process_rx_queue(vring, RxQueueType::RawPkts)?; - } - Ok(false) - } } impl Drop for VhostUserVsockThread { fn drop(&mut self) { let _ = std::fs::remove_file(&self.host_sock_path); + self.thread_backend + .cid_map + .write() + .unwrap() + .remove(&self.guest_cid); } } #[cfg(test)]