diff --git a/crates/vsock/src/vhu_vsock.rs b/crates/vsock/src/vhu_vsock.rs index 27108ff1..bb59e052 100644 --- a/crates/vsock/src/vhu_vsock.rs +++ b/crates/vsock/src/vhu_vsock.rs @@ -317,7 +317,7 @@ impl VhostUserBackend for VhostUserVsockBackend { } } - if device_event != EVT_QUEUE_EVENT && thread.thread_backend.pending_rx() { + if device_event != EVT_QUEUE_EVENT { thread.process_rx(vring_rx, evt_idx)?; } diff --git a/crates/vsock/src/vhu_vsock_thread.rs b/crates/vsock/src/vhu_vsock_thread.rs index ed62a4ae..6954913f 100644 --- a/crates/vsock/src/vhu_vsock_thread.rs +++ b/crates/vsock/src/vhu_vsock_thread.rs @@ -68,6 +68,9 @@ pub(crate) struct VhostUserVsockThread { /// EventFd to notify this thread for custom events. Currently used to notify /// this thread to process raw vsock packets sent from a sibling VM. pub sibling_event_fd: EventFd, + /// Keeps track of which RX queue was processed first in the last iteration. + /// Used to alternate between the RX queues to prevent the starvation of one by the other. + last_processed: RxQueueType, } impl VhostUserVsockThread { @@ -125,6 +128,7 @@ impl VhostUserVsockThread { local_port: Wrapping(0), tx_buffer_size, sibling_event_fd, + last_processed: RxQueueType::Standard, }; VhostUserVsockThread::epoll_register(epoll_fd, host_raw_fd, epoll::Events::EPOLLIN)?; @@ -527,7 +531,7 @@ impl VhostUserVsockThread { } /// Wrapper to process rx queue based on whether event idx is enabled or not. - pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { + pub fn process_standard_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { if event_idx { // To properly handle EVENT_IDX we need to keep calling // process_rx_queue until it stops finding new requests @@ -550,6 +554,50 @@ impl VhostUserVsockThread { Ok(false) } + /// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not. + pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { + if event_idx { + loop { + if !self.thread_backend.pending_raw_pkts() { + break; + } + vring.disable_notification().unwrap(); + + self.process_rx_queue(vring, RxQueueType::RawPkts)?; + if !vring.enable_notification().unwrap() { + break; + } + } + } else { + self.process_rx_queue(vring, RxQueueType::RawPkts)?; + } + Ok(false) + } + + pub fn process_rx(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { + match self.last_processed { + RxQueueType::Standard => { + if self.thread_backend.pending_raw_pkts() { + self.process_raw_pkts(vring, event_idx)?; + self.last_processed = RxQueueType::RawPkts; + } + if self.thread_backend.pending_rx() { + self.process_standard_rx(vring, event_idx)?; + } + } + RxQueueType::RawPkts => { + if self.thread_backend.pending_rx() { + self.process_standard_rx(vring, event_idx)?; + self.last_processed = RxQueueType::Standard; + } + if self.thread_backend.pending_raw_pkts() { + self.process_raw_pkts(vring, event_idx)?; + } + } + } + Ok(false) + } + /// Process tx queue and send requests to the backend for processing. fn process_tx_queue(&mut self, vring: &VringRwLock) -> Result { let mut used_any = false; @@ -645,26 +693,6 @@ impl VhostUserVsockThread { } Ok(false) } - - /// Wrapper to process raw vsock packets queue based on whether event idx is enabled or not. - pub fn process_raw_pkts(&mut self, vring: &VringRwLock, event_idx: bool) -> Result { - if event_idx { - loop { - if !self.thread_backend.pending_raw_pkts() { - break; - } - vring.disable_notification().unwrap(); - - self.process_rx_queue(vring, RxQueueType::RawPkts)?; - if !vring.enable_notification().unwrap() { - break; - } - } - } else { - self.process_rx_queue(vring, RxQueueType::RawPkts)?; - } - Ok(false) - } } impl Drop for VhostUserVsockThread {