Skip to content

Commit a8ff939

Browse files
author
Sebastien Boeuf
committed
vhost_user: Add Inflight I/O tracking support
The inflight I/O tracking feature is useful for handling crashes and disconnections from the backend. It allows the backend to rely on a buffer that was shared earlier with the VMM to restore to the previous state it was before the crash. This feature depends on the availability of the protocol feature VHOST_USER_PROTOCOL_F_INFLIGHT_SHMFD, and it implements both VHOST_USER_GET_INFLIGHT_FD and VHOST_USER_SET_INFLIGHT_FD messages. Fixes #43 Signed-off-by: Sebastien Boeuf <[email protected]>
1 parent 30ba3e7 commit a8ff939

File tree

7 files changed

+201
-3
lines changed

7 files changed

+201
-3
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ vmm-sys-util = ">=0.3.1"
2626
vm-memory = { version = "0.2.0", optional = true }
2727

2828
[dev-dependencies]
29+
tempfile = ">=3.2.0"
2930
vm-memory = { version = "0.2.0", features=["backend-mmap"] }

coverage_config_x86_64.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"coverage_score": 81.0, "exclude_path": "src/vhost_kern/", "crate_features": "vhost-user-master,vhost-user-slave"}
1+
{"coverage_score": 80.9, "exclude_path": "src/vhost_kern/", "crate_features": "vhost-user-master,vhost-user-slave"}

src/vhost_user/dummy_slave.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
// Copyright (C) 2019 Alibaba Cloud Computing. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::os::unix::io::RawFd;
4+
use std::fs::File;
5+
use std::os::unix::io::{AsRawFd, RawFd};
56

67
use super::message::*;
78
use super::*;
@@ -25,6 +26,7 @@ pub struct DummySlaveReqHandler {
2526
pub err_fd: [Option<RawFd>; MAX_QUEUE_NUM],
2627
pub vring_started: [bool; MAX_QUEUE_NUM],
2728
pub vring_enabled: [bool; MAX_QUEUE_NUM],
29+
pub inflight_file: Option<File>,
2830
}
2931

3032
impl DummySlaveReqHandler {
@@ -245,6 +247,28 @@ impl VhostUserSlaveReqHandlerMut for DummySlaveReqHandler {
245247
Ok(())
246248
}
247249

250+
fn get_inflight_fd(
251+
&mut self,
252+
inflight: &VhostUserInflight,
253+
) -> Result<(VhostUserInflight, RawFd)> {
254+
let file = tempfile::tempfile().unwrap();
255+
let fd = file.as_raw_fd();
256+
self.inflight_file = Some(file);
257+
Ok((
258+
VhostUserInflight {
259+
mmap_size: 0x1000,
260+
mmap_offset: 0,
261+
num_queues: inflight.num_queues,
262+
queue_size: inflight.queue_size,
263+
},
264+
fd,
265+
))
266+
}
267+
268+
fn set_inflight_fd(&mut self, _inflight: &VhostUserInflight, _file: File) -> Result<()> {
269+
Ok(())
270+
}
271+
248272
fn get_max_mem_slots(&mut self) -> Result<u64> {
249273
Ok(MAX_MEM_SLOTS as u64)
250274
}

src/vhost_user/master.rs

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
//! Traits and Struct for vhost-user master.
55
6+
use std::fs::File;
67
use std::mem;
7-
use std::os::unix::io::{AsRawFd, RawFd};
8+
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
89
use std::os::unix::net::UnixStream;
910
use std::path::Path;
1011
use std::sync::{Arc, Mutex, MutexGuard};
@@ -51,6 +52,15 @@ pub trait VhostUserMaster: VhostBackend {
5152
/// Setup slave communication channel.
5253
fn set_slave_request_fd(&mut self, fd: RawFd) -> Result<()>;
5354

55+
/// Retrieve shared buffer for inflight I/O tracking.
56+
fn get_inflight_fd(
57+
&mut self,
58+
inflight: &VhostUserInflight,
59+
) -> Result<(VhostUserInflight, File)>;
60+
61+
/// Set shared buffer for inflight I/O tracking.
62+
fn set_inflight_fd(&mut self, inflight: &VhostUserInflight, fd: RawFd) -> Result<()>;
63+
5464
/// Query the maximum amount of memory slots supported by the backend.
5565
fn get_max_mem_slots(&mut self) -> Result<u64>;
5666

@@ -452,6 +462,47 @@ impl VhostUserMaster for Master {
452462
Ok(())
453463
}
454464

465+
fn get_inflight_fd(
466+
&mut self,
467+
inflight: &VhostUserInflight,
468+
) -> Result<(VhostUserInflight, File)> {
469+
let mut node = self.node();
470+
if node.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits() == 0 {
471+
return error_code(VhostUserError::InvalidOperation);
472+
}
473+
474+
let hdr = node.send_request_with_body(MasterReq::GET_INFLIGHT_FD, inflight, None)?;
475+
let (inflight, fds) = node.recv_reply_with_fds::<VhostUserInflight>(&hdr)?;
476+
477+
if let Some(fds) = &fds {
478+
if fds.len() == 1 && fds[0] >= 0 {
479+
// Safe because we know the fd is valid.
480+
let file = unsafe { File::from_raw_fd(fds[0]) };
481+
return Ok((inflight, file));
482+
}
483+
}
484+
485+
// Make sure to close the fds before returning the error.
486+
Endpoint::<MasterReq>::close_rfds(fds);
487+
488+
error_code(VhostUserError::IncorrectFds)
489+
}
490+
491+
fn set_inflight_fd(&mut self, inflight: &VhostUserInflight, fd: RawFd) -> Result<()> {
492+
let mut node = self.node();
493+
if node.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits() == 0 {
494+
return error_code(VhostUserError::InvalidOperation);
495+
}
496+
497+
if inflight.mmap_size == 0 || inflight.num_queues == 0 || inflight.queue_size == 0 || fd < 0
498+
{
499+
return error_code(VhostUserError::InvalidParam);
500+
}
501+
502+
let hdr = node.send_request_with_body(MasterReq::SET_INFLIGHT_FD, inflight, Some(&[fd]))?;
503+
node.wait_for_ack(&hdr).map_err(|e| e.into())
504+
}
505+
455506
fn get_max_mem_slots(&mut self) -> Result<u64> {
456507
let mut node = self.node();
457508
if node.acked_protocol_features & VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits() == 0
@@ -646,6 +697,23 @@ impl MasterInternal {
646697
Ok(body)
647698
}
648699

700+
fn recv_reply_with_fds<T: Sized + Default + VhostUserMsgValidator>(
701+
&mut self,
702+
hdr: &VhostUserMsgHeader<MasterReq>,
703+
) -> VhostUserResult<(T, Option<Vec<RawFd>>)> {
704+
if mem::size_of::<T>() > MAX_MSG_SIZE || hdr.is_reply() {
705+
return Err(VhostUserError::InvalidParam);
706+
}
707+
self.check_state()?;
708+
709+
let (reply, body, rfds) = self.main_sock.recv_body::<T>()?;
710+
if !reply.is_reply_for(&hdr) || rfds.is_none() || !body.is_valid() {
711+
Endpoint::<MasterReq>::close_rfds(rfds);
712+
return Err(VhostUserError::InvalidMessage);
713+
}
714+
Ok((body, rfds))
715+
}
716+
649717
fn recv_reply_with_payload<T: Sized + Default + VhostUserMsgValidator>(
650718
&mut self,
651719
hdr: &VhostUserMsgHeader<MasterReq>,

src/vhost_user/message.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,42 @@ impl VhostUserMsgValidator for VhostUserConfig {
673673
/// Payload for the VhostUserConfig message.
674674
pub type VhostUserConfigPayload = Vec<u8>;
675675

676+
/// Single memory region descriptor as payload for ADD_MEM_REG and REM_MEM_REG
677+
/// requests.
678+
#[repr(C)]
679+
#[derive(Default, Clone)]
680+
pub struct VhostUserInflight {
681+
/// Size of the area to track inflight I/O.
682+
pub mmap_size: u64,
683+
/// Offset of this area from the start of the supplied file descriptor.
684+
pub mmap_offset: u64,
685+
/// Number of virtqueues.
686+
pub num_queues: u16,
687+
/// Size of virtqueues.
688+
pub queue_size: u16,
689+
}
690+
691+
impl VhostUserInflight {
692+
/// Create a new instance.
693+
pub fn new(mmap_size: u64, mmap_offset: u64, num_queues: u16, queue_size: u16) -> Self {
694+
VhostUserInflight {
695+
mmap_size,
696+
mmap_offset,
697+
num_queues,
698+
queue_size,
699+
}
700+
}
701+
}
702+
703+
impl VhostUserMsgValidator for VhostUserInflight {
704+
fn is_valid(&self) -> bool {
705+
if self.num_queues == 0 || self.queue_size == 0 {
706+
return false;
707+
}
708+
true
709+
}
710+
}
711+
676712
/*
677713
* TODO: support dirty log, live migration and IOTLB operations.
678714
#[repr(packed)]

src/vhost_user/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,11 @@ mod tests {
307307
VhostUserProtocolFeatures::all().bits()
308308
);
309309

310+
// get_inflight_fd()
311+
slave.handle_request().unwrap();
312+
// set_inflight_fd()
313+
slave.handle_request().unwrap();
314+
310315
// get_queue_num()
311316
slave.handle_request().unwrap();
312317

@@ -359,6 +364,19 @@ mod tests {
359364
assert_eq!(features.bits(), VhostUserProtocolFeatures::all().bits());
360365
master.set_protocol_features(features).unwrap();
361366

367+
// Retrieve inflight I/O tracking information
368+
let (inflight_info, inflight_file) = master
369+
.get_inflight_fd(&VhostUserInflight {
370+
num_queues: 2,
371+
queue_size: 256,
372+
..Default::default()
373+
})
374+
.unwrap();
375+
// Set the buffer back to the backend
376+
master
377+
.set_inflight_fd(&inflight_info, inflight_file.as_raw_fd())
378+
.unwrap();
379+
362380
let num = master.get_queue_num().unwrap();
363381
assert_eq!(num, 2);
364382

src/vhost_user/slave_req_handler.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright (C) 2019 Alibaba Cloud Computing. All rights reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
use std::fs::File;
45
use std::mem;
56
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
67
use std::os::unix::net::UnixStream;
@@ -62,6 +63,8 @@ pub trait VhostUserSlaveReqHandler {
6263
fn get_config(&self, offset: u32, size: u32, flags: VhostUserConfigFlags) -> Result<Vec<u8>>;
6364
fn set_config(&self, offset: u32, buf: &[u8], flags: VhostUserConfigFlags) -> Result<()>;
6465
fn set_slave_req_fd(&self, _vu_req: SlaveFsCacheReq) {}
66+
fn get_inflight_fd(&self, inflight: &VhostUserInflight) -> Result<(VhostUserInflight, RawFd)>;
67+
fn set_inflight_fd(&self, inflight: &VhostUserInflight, file: File) -> Result<()>;
6568
fn get_max_mem_slots(&self) -> Result<u64>;
6669
fn add_mem_region(&self, region: &VhostUserSingleMemoryRegion, fd: RawFd) -> Result<()>;
6770
fn remove_mem_region(&self, region: &VhostUserSingleMemoryRegion) -> Result<()>;
@@ -105,6 +108,11 @@ pub trait VhostUserSlaveReqHandlerMut {
105108
) -> Result<Vec<u8>>;
106109
fn set_config(&mut self, offset: u32, buf: &[u8], flags: VhostUserConfigFlags) -> Result<()>;
107110
fn set_slave_req_fd(&mut self, _vu_req: SlaveFsCacheReq) {}
111+
fn get_inflight_fd(
112+
&mut self,
113+
inflight: &VhostUserInflight,
114+
) -> Result<(VhostUserInflight, RawFd)>;
115+
fn set_inflight_fd(&mut self, inflight: &VhostUserInflight, file: File) -> Result<()>;
108116
fn get_max_mem_slots(&mut self) -> Result<u64>;
109117
fn add_mem_region(&mut self, region: &VhostUserSingleMemoryRegion, fd: RawFd) -> Result<()>;
110118
fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> Result<()>;
@@ -197,6 +205,14 @@ impl<T: VhostUserSlaveReqHandlerMut> VhostUserSlaveReqHandler for Mutex<T> {
197205
self.lock().unwrap().set_slave_req_fd(vu_req)
198206
}
199207

208+
fn get_inflight_fd(&self, inflight: &VhostUserInflight) -> Result<(VhostUserInflight, RawFd)> {
209+
self.lock().unwrap().get_inflight_fd(inflight)
210+
}
211+
212+
fn set_inflight_fd(&self, inflight: &VhostUserInflight, file: File) -> Result<()> {
213+
self.lock().unwrap().set_inflight_fd(inflight, file)
214+
}
215+
200216
fn get_max_mem_slots(&self) -> Result<u64> {
201217
self.lock().unwrap().get_max_mem_slots()
202218
}
@@ -435,6 +451,41 @@ impl<S: VhostUserSlaveReqHandler> SlaveReqHandler<S> {
435451
self.check_request_size(&hdr, size, hdr.get_size() as usize)?;
436452
self.set_slave_req_fd(&hdr, rfds)?;
437453
}
454+
MasterReq::GET_INFLIGHT_FD => {
455+
if self.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits()
456+
== 0
457+
{
458+
return Err(Error::InvalidOperation);
459+
}
460+
461+
let msg = self.extract_request_body::<VhostUserInflight>(&hdr, size, &buf)?;
462+
let (inflight, fd) = self.backend.get_inflight_fd(&msg)?;
463+
let reply_hdr = self.new_reply_header::<VhostUserInflight>(&hdr, 0)?;
464+
self.main_sock
465+
.send_message(&reply_hdr, &inflight, Some(&[fd]))?;
466+
}
467+
MasterReq::SET_INFLIGHT_FD => {
468+
if self.acked_protocol_features & VhostUserProtocolFeatures::INFLIGHT_SHMFD.bits()
469+
== 0
470+
{
471+
return Err(Error::InvalidOperation);
472+
}
473+
let file = if let Some(fds) = rfds {
474+
if fds.len() != 1 || fds[0] < 0 {
475+
Endpoint::<MasterReq>::close_rfds(Some(fds));
476+
return Err(Error::IncorrectFds);
477+
}
478+
479+
// Safe because we know the fd is valid.
480+
unsafe { File::from_raw_fd(fds[0]) }
481+
} else {
482+
return Err(Error::IncorrectFds);
483+
};
484+
485+
let msg = self.extract_request_body::<VhostUserInflight>(&hdr, size, &buf)?;
486+
let res = self.backend.set_inflight_fd(&msg, file);
487+
self.send_ack_message(&hdr, res)?;
488+
}
438489
MasterReq::GET_MAX_MEM_SLOTS => {
439490
if self.acked_protocol_features
440491
& VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS.bits()

0 commit comments

Comments
 (0)