Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DProxy #4462

Merged
merged 76 commits into from
Mar 21, 2024
Merged

DProxy #4462

Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
dd3bdb5
First implementation of DProxy
john-sharratt Jan 3, 2024
3b54706
Merge remote-tracking branch 'origin/master' into dproxy
john-sharratt Jan 4, 2024
1c646e2
Simplifications of DProxy
john-sharratt Jan 4, 2024
09b7556
Fixed an issue where the runtime would not load properly in DProxy
john-sharratt Jan 5, 2024
d82b1dc
Simplified the loopback listen method so that it fixes a multiplexing…
john-sharratt Jan 5, 2024
6575b12
Merge remote-tracking branch 'origin/master' into dproxy
john-sharratt Jan 7, 2024
92ef764
Refactoring of the rewind handling for DProxy
john-sharratt Jan 7, 2024
4f2e4a6
Fixed an issue where the caching compiled modules were not saving pro…
john-sharratt Jan 8, 2024
fbfbcef
Some minor journal optimizations for a restored state
john-sharratt Jan 9, 2024
13203fd
Renamed InstanceSnapshot to StoreSnapshot which better reflects its p…
john-sharratt Jan 9, 2024
bb1b647
No longer partially restoring orphaned snapshots
john-sharratt Jan 10, 2024
6ea2ab2
Added the capability for the file system to index the data from a mma…
john-sharratt Jan 17, 2024
a40d69b
Added unit tests for offload file
john-sharratt Jan 21, 2024
026636d
Fixed some alignment and serialization issues on the journals
john-sharratt Jan 22, 2024
1fab2ad
Fixed an issue with a lock on the file system
john-sharratt Jan 22, 2024
88c984d
Fixes for the journal fd seed
john-sharratt Jan 22, 2024
41528eb
Fixed some compile issues
john-sharratt Jan 22, 2024
020fcac
Finally fixed the journal alignment issues, once and for all
john-sharratt Jan 22, 2024
2fe6edc
Finished a mounting feature for the journals
john-sharratt Jan 22, 2024
5f4d2db
Now using COW on aligned vectors and strings
john-sharratt Jan 22, 2024
173b07f
Significant improvement in write performance
john-sharratt Jan 23, 2024
e12e146
Finished a progress bar for the loading of a journal
john-sharratt Jan 23, 2024
8aa4169
Significant speedup of the extent table loading
john-sharratt Jan 23, 2024
ef2d7dc
Fixed a bug where file sizes could not be set
john-sharratt Jan 23, 2024
66374be
Fixed the IO operations on file system
john-sharratt Jan 25, 2024
17be193
Fixed the loading of mounted file systems with static web servers
john-sharratt Feb 5, 2024
28b9519
Fixed a bug where journals that have incomplete write operations no l…
john-sharratt Feb 5, 2024
00ac9fa
Fixed an issue with the file system transversal on mounting file systems
john-sharratt Feb 5, 2024
b3dca80
Fixed an issue where a terminating instance does not abort the connec…
john-sharratt Feb 5, 2024
45d9809
DProxy now correctly snapshots threads to the journal
john-sharratt Feb 6, 2024
a467536
The system now snapshots when a workload goes idle
john-sharratt Feb 6, 2024
71000db
Optimized the memory snapshots to reduce wastage
john-sharratt Feb 6, 2024
3b2ec3a
The memory snapshot functionality is now much faster using region com…
john-sharratt Feb 6, 2024
967b83a
Added a todo on using dirty pages to optimize the hashing time when s…
john-sharratt Feb 6, 2024
36abb81
Fixed the remote sockets
john-sharratt Feb 7, 2024
aa64c36
Fixed the writing of the memory hashes to the bitmap
john-sharratt Feb 7, 2024
d93a2fd
Added extra data for the threads so they can be more easily recreated
john-sharratt Feb 8, 2024
3eeb86f
Added the functionality that spawns threads
john-sharratt Feb 8, 2024
c187126
Now spawning the sub threads correctly on journal resume
john-sharratt Feb 8, 2024
da21864
Fixed an issue where the main thread was not rewinding properly
john-sharratt Feb 8, 2024
f2dfd5c
Fixed an issue where the threads were not properly rewinding on journals
john-sharratt Feb 8, 2024
354e38c
Failed WASM processes now properly terminate running connections
john-sharratt Feb 8, 2024
d7255c4
Fixed the thread resuming however there is now a tokio runtime problem
john-sharratt Feb 8, 2024
d034c50
Resolved merge conflicts
john-sharratt Feb 13, 2024
1fcf56b
Journal now properly cleans up when a snapshot is missing
john-sharratt Feb 20, 2024
6216634
Fixed the final pieces for a single threaded dproxy
john-sharratt Feb 22, 2024
2cbfb91
Merge remote-tracking branch 'origin/master' into dproxy
john-sharratt Feb 22, 2024
de2606d
Correctly connected up the signal process for the ctrl-c key
john-sharratt Feb 25, 2024
bb430fe
Now taking snapshots on signal events however its not waking properly…
john-sharratt Feb 25, 2024
68fedb8
Fixed an issue where ctrl-c was not triggering during deep threading …
john-sharratt Feb 25, 2024
3757754
Renaming some of the spawn functions to make them easier to understand
john-sharratt Feb 25, 2024
f68f58d
Renamed the journal syscall player
john-sharratt Feb 25, 2024
bd95ef3
Added another struct for RemoteSocket to make it simplier
john-sharratt Feb 25, 2024
9b38ee5
Added a comment on one of the apply thread methods
john-sharratt Feb 25, 2024
049f505
Added comments on VIRUTAL_FD_ROOT
john-sharratt Feb 25, 2024
2184922
Missed one
john-sharratt Feb 25, 2024
bc520d4
Added additional comments
john-sharratt Feb 25, 2024
09672e9
Fixed a whole bunch of linting errors
john-sharratt Feb 25, 2024
d064818
Fixed some compile issues for JS
john-sharratt Feb 26, 2024
192f3e8
Merge remote-tracking branch 'origin/master' into dproxy
john-sharratt Feb 26, 2024
573b342
Linting fix
john-sharratt Feb 26, 2024
502957f
More minor linting fixes
john-sharratt Feb 26, 2024
0b04236
Removed fuse as a default dependency as it does not work on all platf…
john-sharratt Feb 26, 2024
51e2e75
More linting fixes
john-sharratt Feb 26, 2024
fc85d90
More linting fixes
john-sharratt Feb 26, 2024
62b7c72
Fixed some unit tests
john-sharratt Feb 26, 2024
0a6abfd
Fixed some unit tests
john-sharratt Feb 26, 2024
329f972
Fixed one of the lints that was missed
john-sharratt Feb 26, 2024
de476a6
Linting and unit test fixes
john-sharratt Feb 26, 2024
46914b9
Fixed multithreading for journals
john-sharratt Feb 26, 2024
f0290c8
Fixed yet another linting issue
john-sharratt Feb 26, 2024
887e386
Merge branch 'master' into dproxy
john-sharratt Feb 27, 2024
1ee2392
Merge branch 'master' into dproxy
john-sharratt Feb 27, 2024
028a5a8
Exposing WASIX method so that it can be consumed by custom task managers
john-sharratt Feb 27, 2024
03678c9
Merge branch 'master' into dproxy
john-sharratt Mar 11, 2024
c09b105
Merge branch 'master' into dproxy
john-sharratt Mar 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions lib/wasix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tokio = { version = "1", features = [
], default_features = false }
tokio-stream = { version = "0.1", features = [ "sync" ] }
futures = { version = "0.3" }
async-ctrlc = { version = "1.2.0", features = [ "stream" ], optional = true }
john-sharratt marked this conversation as resolved.
Show resolved Hide resolved
# used by feature='os'
async-trait = { version = "^0.1" }
urlencoding = { version = "^2" }
Expand Down Expand Up @@ -141,6 +142,7 @@ wasmer = { path = "../api", version = "=4.2.5", default-features = false, featur
default = ["sys-default"]

time = ["tokio/time"]
ctrlc = ["dep:async-ctrlc"]

webc_runner_rt_wcgi = ["hyper", "wcgi", "wcgi-host", "tower", "tower-http"]
webc_runner_rt_dcgi = ["webc_runner_rt_wcgi", "journal"]
Expand All @@ -158,6 +160,7 @@ sys-default = [
"host-vnet",
"host-threads",
"host-reqwest",
"ctrlc"
]
sys-poll = []
sys-thread = ["tokio/rt", "tokio/time", "tokio/rt-multi-thread", "rusty_pool"]
Expand Down
119 changes: 84 additions & 35 deletions lib/wasix/src/os/task/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::{
signal::{SignalDeliveryError, SignalHandlerAbi},
task_join_handle::OwnedTaskStatus,
thread::WasiMemoryLayout,
TaskStatus,
};

/// Represents the ID of a sub-process
Expand Down Expand Up @@ -147,6 +148,8 @@ impl Into<Range<u64>> for MemorySnapshotRegion {
pub struct WasiProcessInner {
/// Unique ID of this process
pub pid: WasiProcessId,
/// Number of threads waiting for children to exit
pub(crate) waiting: Arc<AtomicU32>,
/// The threads that make up this process
pub threads: HashMap<WasiThreadId, WasiThread>,
/// Number of threads running for this process
Expand All @@ -158,6 +161,9 @@ pub struct WasiProcessInner {
/// Represents a checkpoint which blocks all the threads
/// and then executes some maintenance action
pub checkpoint: WasiProcessCheckpoint,
/// Flag that indicates that the process should snapshot
/// itself and exit when the ctrl-c key is pressed
pub snapshot_on_sigint: bool,
/// Any wakers waiting on this process (for example for a checkpoint)
pub wakers: Vec<Waker>,
/// The snapshot memory significantly reduce the amount of
Expand Down Expand Up @@ -331,26 +337,47 @@ impl Drop for WasiProcessWait {

impl WasiProcess {
pub fn new(pid: WasiProcessId, module_hash: ModuleHash, plane: WasiControlPlaneHandle) -> Self {
let waiting = Arc::new(AtomicU32::new(0));
let inner = Arc::new((
Mutex::new(WasiProcessInner {
pid,
threads: Default::default(),
thread_count: Default::default(),
signal_intervals: Default::default(),
children: Default::default(),
checkpoint: WasiProcessCheckpoint::Execute,
wakers: Default::default(),
waiting: waiting.clone(),
snapshot_on_sigint: false,
snapshot_memory_hash: Default::default(),
}),
Condvar::new(),
));

#[derive(Debug)]
struct SignalHandler(LockableWasiProcessInner);
impl SignalHandlerAbi for SignalHandler {
fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError> {
if let Ok(signal) = signal.try_into() {
signal_process_internal(&self.0, signal);
Ok(())
} else {
Err(SignalDeliveryError)
}
}
}

WasiProcess {
pid,
module_hash,
parent: None,
compute: plane,
inner: Arc::new((
Mutex::new(WasiProcessInner {
pid,
threads: Default::default(),
thread_count: Default::default(),
signal_intervals: Default::default(),
children: Default::default(),
checkpoint: WasiProcessCheckpoint::Execute,
wakers: Default::default(),
snapshot_memory_hash: Default::default(),
}),
Condvar::new(),
)),
finished: Arc::new(OwnedTaskStatus::default()),
waiting: Arc::new(AtomicU32::new(0)),
inner: inner.clone(),
finished: Arc::new(
OwnedTaskStatus::new(TaskStatus::Pending)
.with_signal_handler(Arc::new(SignalHandler(inner))),
),
waiting,
}
}

Expand Down Expand Up @@ -471,26 +498,7 @@ impl WasiProcess {

/// Signals all the threads in this process
pub fn signal_process(&self, signal: Signal) {
let pid = self.pid();
tracing::trace!(%pid, "signal-process({:?})", signal);

{
let inner = self.inner.0.lock().unwrap();
if self.waiting.load(Ordering::Acquire) > 0 {
let mut triggered = false;
for child in inner.children.iter() {
child.signal_process(signal);
triggered = true;
}
if triggered {
return;
}
}
}
let inner = self.inner.0.lock().unwrap();
for thread in inner.threads.values() {
thread.signal(signal);
}
signal_process_internal(&self.inner, signal);
}

/// Signals one of the threads every interval
Expand Down Expand Up @@ -606,6 +614,47 @@ impl WasiProcess {
}
}

/// Signals all the threads in this process
fn signal_process_internal(process: &LockableWasiProcessInner, signal: Signal) {
let mut inner = process.0.lock().unwrap();
let pid = inner.pid;
tracing::trace!(%pid, "signal-process({:?})", signal);

// If the snapshot on ctrl-c is currently registered then we need
// to take a snapshot and exit
if inner.snapshot_on_sigint {
tracing::debug!(%pid, "snapshot-on-interrupt-signal");

// Initiate the checksum
inner.checkpoint = WasiProcessCheckpoint::Snapshot {
trigger: SnapshotTrigger::Sigint,
};
for waker in inner.wakers.drain(..) {
waker.wake();
}
process.1.notify_all();
return;
}

// Check if there are subprocesses that will receive this signal
// instead of this process
if inner.waiting.load(Ordering::Acquire) > 0 {
let mut triggered = false;
for child in inner.children.iter() {
child.signal_process(signal);
triggered = true;
}
if triggered {
return;
}
}

// Otherwise just send the signal to all the threads
for thread in inner.threads.values() {
thread.signal(signal);
}
}

impl SignalHandlerAbi for WasiProcess {
fn signal(&self, sig: u8) -> Result<(), SignalDeliveryError> {
if let Ok(sig) = sig.try_into() {
Expand Down
31 changes: 30 additions & 1 deletion lib/wasix/src/os/task/signal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use wasmer_wasix_types::types::Signal;

Expand All @@ -15,6 +15,8 @@ where
fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError>;
}

pub type DynSignalHandlerAbi = dyn SignalHandlerAbi + Send + Sync + 'static;

#[derive(Debug)]
pub struct WasiSignalInterval {
/// Signal that will be raised
Expand All @@ -26,3 +28,30 @@ pub struct WasiSignalInterval {
/// Last time that a signal was triggered
pub last_signal: u128,
}

pub fn default_signal_handler() -> Arc<DynSignalHandlerAbi> {
#[derive(Debug)]
struct DefaultHandler {}
impl SignalHandlerAbi for DefaultHandler {
fn signal(&self, signal: u8) -> Result<(), SignalDeliveryError> {
if let Ok(signal) = TryInto::<Signal>::try_into(signal) {
match signal {
Signal::Sigkill
| Signal::Sigterm
| Signal::Sigabrt
| Signal::Sigquit
| Signal::Sigint
| Signal::Sigstop => {
tracing::debug!("handling terminate signal");
std::process::exit(1);
}
signal => tracing::info!("unhandled signal - {:?}", signal),
}
} else {
tracing::info!("unknown signal - {}", signal)
}
Ok(())
}
}
Arc::new(DefaultHandler {})
}
42 changes: 42 additions & 0 deletions lib/wasix/src/os/task/task_join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use wasmer_wasix_types::wasi::{Errno, ExitCode};

use crate::WasiRuntimeError;

use super::signal::{default_signal_handler, DynSignalHandlerAbi};

#[derive(Clone, Debug)]
pub enum TaskStatus {
Pending,
Expand Down Expand Up @@ -70,6 +72,9 @@ pub trait VirtualTaskHandle: std::fmt::Debug + Send + Sync + 'static {
/// A handle that allows awaiting the termination of a task, and retrieving its exit code.
#[derive(Debug)]
pub struct OwnedTaskStatus {
// The signal handler that can be invoked for this owned task
signal_handler: Arc<DynSignalHandlerAbi>,

watch_tx: tokio::sync::watch::Sender<TaskStatus>,
// Even through unused, without this receive there is a race condition
// where the previously sent values are lost.
Expand All @@ -81,11 +86,23 @@ impl OwnedTaskStatus {
pub fn new(status: TaskStatus) -> Self {
let (tx, rx) = tokio::sync::watch::channel(status);
Self {
signal_handler: default_signal_handler(),
watch_tx: tx,
watch_rx: rx,
}
}

/// Sets the signal handler used for this owned task
pub fn set_signal_handler(&mut self, handler: Arc<DynSignalHandlerAbi>) {
self.signal_handler = handler;
}

/// Attaches a signal handler
pub fn with_signal_handler(mut self, handler: Arc<DynSignalHandlerAbi>) -> Self {
self.set_signal_handler(handler);
self
}

pub fn new_finished_with_code(code: ExitCode) -> Self {
Self::new(TaskStatus::Finished(Ok(code)))
}
Expand Down Expand Up @@ -144,6 +161,7 @@ impl OwnedTaskStatus {

pub fn handle(&self) -> TaskJoinHandle {
TaskJoinHandle {
signal_handler: self.signal_handler.clone(),
watch: self.watch_tx.subscribe(),
}
}
Expand All @@ -158,6 +176,7 @@ impl Default for OwnedTaskStatus {
/// A handle that allows awaiting the termination of a task, and retrieving its exit code.
#[derive(Clone, Debug)]
pub struct TaskJoinHandle {
signal_handler: Arc<DynSignalHandlerAbi>,
watch: tokio::sync::watch::Receiver<TaskStatus>,
}

Expand All @@ -167,6 +186,29 @@ impl TaskJoinHandle {
self.watch.borrow().clone()
}

#[cfg(feature = "ctrlc")]
pub fn install_ctrlc_handler(&self) {
use wasmer::FromToNativeWasmType;
use wasmer_wasix_types::wasi::Signal;

let signal_handler = self.signal_handler.clone();

let mut ctrlc = async_ctrlc::CtrlC::new().expect("cannot create Ctrl+C handler");
let task_handle = self.clone();
tokio::spawn(async move {
// Loop sending ctrl-c presses as signals to the signal handler
loop {
let ctrlc_pin = &mut ctrlc;
ctrlc_pin.await;

if let Err(err) = signal_handler.signal(Signal::Sigint.to_native() as u8) {
tracing::error!("failed to process signal - {}", err);
std::process::exit(1);
}
}
});
}

/// Wait until the task finishes.
pub async fn wait_finished(&mut self) -> Result<ExitCode, Arc<WasiRuntimeError>> {
loop {
Expand Down
3 changes: 3 additions & 0 deletions lib/wasix/src/runners/wasi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ impl crate::runners::Runner for WasiRunner {
.await
.context("Spawn failed")?;

#[cfg(feature = "ctrlc")]
task_handle.install_ctrlc_handler();

task_handle
.wait_finished()
.await
Expand Down
Loading
Loading