Skip to content

Commit 7b2632b

Browse files
ikichavireshk
authored andcommitted
[vsock] refactor VhostUserVsockThread worker
For now, VhostUserVsockThread uses thread pool executor from futures, but it doesn't need to use thread pool executor and futures because we just need background worker thread, and a way to let it work. So I removed unnecessary external dependency and made the logic simpler by using just thread and channel Signed-off-by: Jeongik Cha <[email protected]>
1 parent ed5b597 commit 7b2632b

File tree

4 files changed

+71
-189
lines changed

4 files changed

+71
-189
lines changed

Cargo.lock

Lines changed: 0 additions & 128 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/vsock/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ byteorder = "1"
1717
clap = { version = "4.4", features = ["derive"] }
1818
env_logger = "0.10"
1919
epoll = "4.3.2"
20-
futures = { version = "0.3", features = ["thread-pool"] }
2120
log = "0.4"
2221
thiserror = "1.0"
2322
vhost = { version = "0.8", features = ["vhost-user-slave"] }

crates/vsock/src/vhu_vsock.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,6 @@ pub(crate) enum Error {
115115
IterateQueue,
116116
#[error("No rx request available")]
117117
NoRequestRx,
118-
#[error("Unable to create thread pool")]
119-
CreateThreadPool(std::io::Error),
120118
#[error("Packet missing data buffer")]
121119
PktBufMissing,
122120
#[error("Failed to connect to unix socket")]

crates/vsock/src/vhu_vsock_thread.rs

Lines changed: 71 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ use std::{
1212
net::{UnixListener, UnixStream},
1313
prelude::{AsRawFd, FromRawFd, RawFd},
1414
},
15-
sync::{Arc, RwLock},
15+
sync::mpsc::Sender,
16+
sync::{mpsc, Arc, RwLock},
17+
thread,
1618
};
1719

18-
use futures::executor::{ThreadPool, ThreadPoolBuilder};
1920
use log::warn;
2021
use vhost_user_backend::{VringEpollHandler, VringRwLock, VringT};
2122
use virtio_queue::QueueOwnedT;
@@ -42,6 +43,15 @@ enum RxQueueType {
4243
Standard,
4344
RawPkts,
4445
}
46+
47+
// Data which is required by a worker handling event idx.
48+
struct EventData {
49+
vring: VringRwLock,
50+
event_idx: bool,
51+
head_idx: u16,
52+
used_len: usize,
53+
}
54+
4555
pub(crate) struct VhostUserVsockThread {
4656
/// Guest memory map.
4757
pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
@@ -61,8 +71,8 @@ pub(crate) struct VhostUserVsockThread {
6171
pub thread_backend: VsockThreadBackend,
6272
/// CID of the guest.
6373
guest_cid: u64,
64-
/// Thread pool to handle event idx.
65-
pool: ThreadPool,
74+
/// Channel to a worker which handles event idx.
75+
sender: Sender<EventData>,
6676
/// host side port on which application listens.
6777
local_port: Wrapping<u32>,
6878
/// The tx buffer size
@@ -126,7 +136,15 @@ impl VhostUserVsockThread {
126136
),
127137
);
128138
}
129-
139+
let (sender, receiver) = mpsc::channel::<EventData>();
140+
thread::spawn(move || loop {
141+
// TODO: Understand why doing the following in the background thread works.
142+
// maybe we'd better have thread pool for the entire application if necessary.
143+
let Ok(event_data) = receiver.recv() else {
144+
break;
145+
};
146+
Self::vring_handle_event(event_data);
147+
});
130148
let thread = VhostUserVsockThread {
131149
mem: None,
132150
event_idx: false,
@@ -137,10 +155,7 @@ impl VhostUserVsockThread {
137155
epoll_file,
138156
thread_backend,
139157
guest_cid,
140-
pool: ThreadPoolBuilder::new()
141-
.pool_size(1)
142-
.create()
143-
.map_err(Error::CreateThreadPool)?,
158+
sender,
144159
local_port: Wrapping(0),
145160
tx_buffer_size,
146161
sibling_event_fd,
@@ -152,6 +167,37 @@ impl VhostUserVsockThread {
152167
Ok(thread)
153168
}
154169

170+
fn vring_handle_event(event_data: EventData) {
171+
if event_data.event_idx {
172+
if event_data
173+
.vring
174+
.add_used(event_data.head_idx, event_data.used_len as u32)
175+
.is_err()
176+
{
177+
warn!("Could not return used descriptors to ring");
178+
}
179+
match event_data.vring.needs_notification() {
180+
Err(_) => {
181+
warn!("Could not check if queue needs to be notified");
182+
event_data.vring.signal_used_queue().unwrap();
183+
}
184+
Ok(needs_notification) => {
185+
if needs_notification {
186+
event_data.vring.signal_used_queue().unwrap();
187+
}
188+
}
189+
}
190+
} else {
191+
if event_data
192+
.vring
193+
.add_used(event_data.head_idx, event_data.used_len as u32)
194+
.is_err()
195+
{
196+
warn!("Could not return used descriptors to ring");
197+
}
198+
event_data.vring.signal_used_queue().unwrap();
199+
}
200+
}
155201
/// Register a file with an epoll to listen for events in evset.
156202
pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> {
157203
epoll::ctl(
@@ -504,31 +550,14 @@ impl VhostUserVsockThread {
504550

505551
let vring = vring.clone();
506552
let event_idx = self.event_idx;
507-
508-
self.pool.spawn_ok(async move {
509-
// TODO: Understand why doing the following in the pool works
510-
if event_idx {
511-
if vring.add_used(head_idx, used_len as u32).is_err() {
512-
warn!("Could not return used descriptors to ring");
513-
}
514-
match vring.needs_notification() {
515-
Err(_) => {
516-
warn!("Could not check if queue needs to be notified");
517-
vring.signal_used_queue().unwrap();
518-
}
519-
Ok(needs_notification) => {
520-
if needs_notification {
521-
vring.signal_used_queue().unwrap();
522-
}
523-
}
524-
}
525-
} else {
526-
if vring.add_used(head_idx, used_len as u32).is_err() {
527-
warn!("Could not return used descriptors to ring");
528-
}
529-
vring.signal_used_queue().unwrap();
530-
}
531-
});
553+
self.sender
554+
.send(EventData {
555+
vring,
556+
event_idx,
557+
head_idx,
558+
used_len,
559+
})
560+
.unwrap();
532561

533562
match rx_queue_type {
534563
RxQueueType::Standard => {
@@ -661,30 +690,14 @@ impl VhostUserVsockThread {
661690

662691
let vring = vring.clone();
663692
let event_idx = self.event_idx;
664-
665-
self.pool.spawn_ok(async move {
666-
if event_idx {
667-
if vring.add_used(head_idx, used_len as u32).is_err() {
668-
warn!("Could not return used descriptors to ring");
669-
}
670-
match vring.needs_notification() {
671-
Err(_) => {
672-
warn!("Could not check if queue needs to be notified");
673-
vring.signal_used_queue().unwrap();
674-
}
675-
Ok(needs_notification) => {
676-
if needs_notification {
677-
vring.signal_used_queue().unwrap();
678-
}
679-
}
680-
}
681-
} else {
682-
if vring.add_used(head_idx, used_len as u32).is_err() {
683-
warn!("Could not return used descriptors to ring");
684-
}
685-
vring.signal_used_queue().unwrap();
686-
}
687-
});
693+
self.sender
694+
.send(EventData {
695+
vring,
696+
event_idx,
697+
head_idx,
698+
used_len,
699+
})
700+
.unwrap();
688701
}
689702

690703
Ok(used_any)

0 commit comments

Comments
 (0)