diff --git a/lib/cli/src/commands/journal/extract.rs b/lib/cli/src/commands/journal/extract.rs new file mode 100644 index 00000000000..20dd30c368f --- /dev/null +++ b/lib/cli/src/commands/journal/extract.rs @@ -0,0 +1,47 @@ +use std::path::PathBuf; + +use clap::Parser; +use wasmer_wasix::journal::{copy_journal, LogFileJournal}; + +use crate::commands::CliCommand; + +#[derive(Debug, Parser)] +pub struct CmdExtractWhatMemory { + /// Path to the memory file that will be updated using this journal + #[clap(index = 1)] + memory_file_path: PathBuf, +} + +/// What to extract from the journal +#[derive(clap::Subcommand, Debug)] +pub enum CmdExtractWhat { + Memory(CmdExtractWhatMemory), +} + +/// Extracts an element from the journal +#[derive(Debug, Parser)] +pub struct CmdJournalExtract { + /// Path to the journal that will be compacted + #[clap(index = 1)] + journal_path: PathBuf, + + #[clap(subcommand)] + what: CmdExtractWhat, +} + +impl CliCommand for CmdJournalExtract { + type Output = (); + + fn run(self) -> Result<(), anyhow::Error> { + let journal = LogFileJournal::new(&self.journal_path)?; + + match self.what { + CmdExtractWhat::Memory(cmd) => { + let memory_file = + wasmer_wasix::journal::MemFileJournal::new(&cmd.memory_file_path)?; + copy_journal(&journal, &memory_file)?; + } + } + Ok(()) + } +} diff --git a/lib/cli/src/commands/journal/mod.rs b/lib/cli/src/commands/journal/mod.rs index 4e650719576..44253cad529 100644 --- a/lib/cli/src/commands/journal/mod.rs +++ b/lib/cli/src/commands/journal/mod.rs @@ -2,6 +2,7 @@ use crate::commands::CliCommand; mod compact; mod export; +mod extract; mod filter; mod import; mod inspect; @@ -10,6 +11,7 @@ mod mount; pub use compact::*; pub use export::*; +pub use extract::*; pub use filter::*; pub use import::*; pub use inspect::*; @@ -32,6 +34,8 @@ pub enum CmdJournal { /// Mounts the journal at a particular directory #[cfg(feature = "fuse")] Mount(CmdJournalMount), + /// Extracts an element of a journal + Extract(CmdJournalExtract), } impl CliCommand for CmdJournal { @@ -46,6 +50,7 @@ impl CliCommand for CmdJournal { Self::Filter(cmd) => cmd.run(), #[cfg(feature = "fuse")] Self::Mount(cmd) => cmd.run(), + Self::Extract(cmd) => cmd.run(), } } } diff --git a/lib/compiler-cranelift/src/translator/func_environ.rs b/lib/compiler-cranelift/src/translator/func_environ.rs index 16fcbc48fa4..f9e40d5047c 100644 --- a/lib/compiler-cranelift/src/translator/func_environ.rs +++ b/lib/compiler-cranelift/src/translator/func_environ.rs @@ -61,6 +61,7 @@ pub trait TargetEnvironment { } /// Get the size of a native pointer, in bytes. + #[allow(dead_code)] fn pointer_bytes(&self) -> u8 { self.target_config().pointer_bytes() } @@ -470,6 +471,7 @@ pub trait FuncEnvironment: TargetEnvironment { } /// Get the type of the global at the given index. + #[allow(dead_code)] fn get_global_type(&self, global_index: GlobalIndex) -> Option; /// Push a local declaration on to the stack to track the type of locals. @@ -479,14 +481,18 @@ pub trait FuncEnvironment: TargetEnvironment { fn push_params_on_stack(&mut self, function_index: LocalFunctionIndex); /// Get the type of the local at the given index. + #[allow(dead_code)] fn get_local_type(&self, local_index: u32) -> Option; /// Get the types of all the current locals. + #[allow(dead_code)] fn get_local_types(&self) -> &[WasmerType]; /// Get the type of the local at the given index. + #[allow(dead_code)] fn get_function_type(&self, function_index: FunctionIndex) -> Option<&FunctionType>; /// Get the type of a function with the given signature index. + #[allow(dead_code)] fn get_function_sig(&self, sig_index: SignatureIndex) -> Option<&FunctionType>; } diff --git a/lib/journal/src/concrete/archived.rs b/lib/journal/src/concrete/archived.rs index d03b8583dd0..19687e273fd 100644 --- a/lib/journal/src/concrete/archived.rs +++ b/lib/journal/src/concrete/archived.rs @@ -1,4 +1,3 @@ -use lz4_flex::block::compress_prepend_size; use num_enum::{IntoPrimitive, TryFromPrimitive}; use rkyv::ser::{ScratchSpace, Serializer}; use rkyv::{Archive, CheckBytes, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize}; @@ -443,13 +442,14 @@ impl<'a> JournalEntry<'a> { JournalEntry::ClearEtherealV1 => { serializer.serialize_value(&JournalEntryClearEtherealV1 {}) } - JournalEntry::UpdateMemoryRegionV1 { region, data } => { - serializer.serialize_value(&JournalEntryUpdateMemoryRegionV1 { - start: region.start, - end: region.end, - compressed_data: compress_prepend_size(data.as_ref()).into(), - }) - } + JournalEntry::UpdateMemoryRegionV1 { + region, + compressed_data, + } => serializer.serialize_value(&JournalEntryUpdateMemoryRegionV1 { + start: region.start, + end: region.end, + compressed_data: compressed_data.into(), + }), JournalEntry::ProcessExitV1 { exit_code } => { serializer.serialize_value(&JournalEntryProcessExitV1 { exit_code: exit_code.map(|e| e.into()), diff --git a/lib/journal/src/concrete/archived_from.rs b/lib/journal/src/concrete/archived_from.rs index 853973061bf..5ce521a7f74 100644 --- a/lib/journal/src/concrete/archived_from.rs +++ b/lib/journal/src/concrete/archived_from.rs @@ -1,4 +1,3 @@ -use lz4_flex::block::decompress_size_prepended; use std::borrow::Cow; use std::time::SystemTime; use wasmer_wasix_types::wasi; @@ -663,7 +662,7 @@ impl<'a> TryFrom> for JournalEntry<'a> { }, ) => Self::UpdateMemoryRegionV1 { region: (*start)..(*end), - data: Cow::Owned(decompress_size_prepended(compressed_data.as_ref())?), + compressed_data: Cow::Borrowed(compressed_data.as_ref()), }, ArchivedJournalEntry::ProcessExitV1(ArchivedJournalEntryProcessExitV1 { exit_code, diff --git a/lib/journal/src/concrete/compacting.rs b/lib/journal/src/concrete/compacting.rs index 3626e0533e5..b6d1a650636 100644 --- a/lib/journal/src/concrete/compacting.rs +++ b/lib/journal/src/concrete/compacting.rs @@ -54,6 +54,11 @@ struct State { // Thread events are only maintained while the thread and the // process are still running thread_map: HashMap, + // Sockets that are open and not yet closed are kept here + open_sockets: HashMap, + // Open pipes have two file descriptors that are associated with + // them. We keep track of both of them + open_pipes: HashMap, // Any descriptors are assumed to be read only operations until // they actually do something that changes the system suspect_descriptors: HashMap, @@ -123,6 +128,26 @@ impl State { } } } + for (_, l) in self.open_sockets.iter() { + if let Some(d) = self.descriptors.get(l) { + for e in d.events.iter() { + filter.add_event_to_whitelist(*e); + } + for e in d.write_map.values() { + filter.add_event_to_whitelist(*e); + } + } + } + for (_, l) in self.open_pipes.iter() { + if let Some(d) = self.descriptors.get(l) { + for e in d.events.iter() { + filter.add_event_to_whitelist(*e); + } + for e in d.write_map.values() { + filter.add_event_to_whitelist(*e); + } + } + } if has_threads { for (_, l) in self.stdio_descriptors.iter() { if let Some(d) = self.descriptors.get(l) { @@ -181,6 +206,8 @@ impl CompactingJournal { snapshots: Default::default(), memory_map: Default::default(), thread_map: Default::default(), + open_sockets: Default::default(), + open_pipes: Default::default(), create_directory: Default::default(), remove_directory: Default::default(), create_trunc_file: Default::default(), @@ -384,7 +411,14 @@ impl WritableJournal for CompactingJournalTx { // Get the lookup // (if its suspect then it will remove the entry and // thus the entire branch of events it represents is discarded) + let mut skip = false; let lookup = if matches!(&entry, JournalEntry::CloseFileDescriptorV1 { .. }) { + if state.open_sockets.remove(fd).is_some() { + skip = true; + } + if state.open_pipes.remove(fd).is_some() { + skip = true; + } state.suspect_descriptors.remove(fd) } else { state.suspect_descriptors.get(fd).cloned() @@ -393,11 +427,13 @@ impl WritableJournal for CompactingJournalTx { .or_else(|| state.keep_descriptors.get(fd).cloned()) .or_else(|| state.stdio_descriptors.get(fd).cloned()); - if let Some(lookup) = lookup { - let state = state.descriptors.entry(lookup).or_default(); - state.events.push(event_index); - } else { - state.whitelist.insert(event_index); + if !skip { + if let Some(lookup) = lookup { + let state = state.descriptors.entry(lookup).or_default(); + state.events.push(event_index); + } else { + state.whitelist.insert(event_index); + } } } // Things that modify a file descriptor mean that it is @@ -406,7 +442,15 @@ impl WritableJournal for CompactingJournalTx { | JournalEntry::FileDescriptorAllocateV1 { fd, .. } | JournalEntry::FileDescriptorSetFlagsV1 { fd, .. } | JournalEntry::FileDescriptorSetTimesV1 { fd, .. } - | JournalEntry::FileDescriptorWriteV1 { fd, .. } => { + | JournalEntry::FileDescriptorWriteV1 { fd, .. } + | JournalEntry::SocketBindV1 { fd, .. } + | JournalEntry::SocketSendFileV1 { socket_fd: fd, .. } + | JournalEntry::SocketSendToV1 { fd, .. } + | JournalEntry::SocketSendV1 { fd, .. } + | JournalEntry::SocketSetOptFlagV1 { fd, .. } + | JournalEntry::SocketSetOptSizeV1 { fd, .. } + | JournalEntry::SocketSetOptTimeV1 { fd, .. } + | JournalEntry::SocketShutdownV1 { fd, .. } => { // Its no longer suspect if let Some(lookup) = state.suspect_descriptors.remove(fd) { state.keep_descriptors.insert(*fd, lookup); @@ -417,6 +461,8 @@ impl WritableJournal for CompactingJournalTx { .suspect_descriptors .get(fd) .cloned() + .or_else(|| state.open_sockets.get(fd).cloned()) + .or_else(|| state.open_pipes.get(fd).cloned()) .or_else(|| state.keep_descriptors.get(fd).cloned()) .or_else(|| state.stdio_descriptors.get(fd).cloned()); @@ -477,6 +523,35 @@ impl WritableJournal for CompactingJournalTx { state.create_directory.remove(&path); state.remove_directory.entry(path).or_insert(event_index); } + // Pipes that remain open at the end will be added + JournalEntry::CreatePipeV1 { fd1, fd2, .. } => { + let lookup = DescriptorLookup(state.descriptor_seed); + state.descriptor_seed += 1; + state.open_pipes.insert(*fd1, lookup); + + state + .descriptors + .entry(lookup) + .or_default() + .events + .push(event_index); + + let lookup = DescriptorLookup(state.descriptor_seed); + state.descriptor_seed += 1; + state.open_pipes.insert(*fd2, lookup); + } + // Sockets that are accepted are suspect + JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::SocketOpenV1 { fd, .. } => { + let lookup = DescriptorLookup(state.descriptor_seed); + state.descriptor_seed += 1; + state.open_sockets.insert(*fd, lookup); + state + .descriptors + .entry(lookup) + .or_default() + .events + .push(event_index); + } _ => { // The fallthrough is to whitelist the event so that it will // be reflected in the next compaction event diff --git a/lib/journal/src/concrete/mem_file.rs b/lib/journal/src/concrete/mem_file.rs new file mode 100644 index 00000000000..6e19ace60b9 --- /dev/null +++ b/lib/journal/src/concrete/mem_file.rs @@ -0,0 +1,98 @@ +use std::{ + fs::File, + io::{Seek, Write}, + path::Path, + sync::RwLock, +}; + +use lz4_flex::{block, decompress}; + +use super::*; + +/// The memory file journal processes journal entries by writing any memory mutations +/// directly to a file. Later this can be used as a mounting target for resuming +/// a process without having to reload the journal from scratch. +#[derive(Debug)] +pub struct MemFileJournal { + file: RwLock, +} + +impl MemFileJournal { + pub fn new(path: &Path) -> anyhow::Result { + Ok(Self { + file: RwLock::new( + std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(path)?, + ), + }) + } +} + +impl Drop for MemFileJournal { + fn drop(&mut self) { + let _ = self.flush(); + } +} + +impl Clone for MemFileJournal { + fn clone(&self) -> Self { + let file = self.file.read().unwrap(); + Self { + file: RwLock::new(file.try_clone().unwrap()), + } + } +} + +impl ReadableJournal for MemFileJournal { + fn read(&self) -> anyhow::Result>> { + Ok(None) + } + + fn as_restarted(&self) -> anyhow::Result> { + Ok(Box::new(self.clone())) + } +} + +impl WritableJournal for MemFileJournal { + fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result { + let estimated_size = entry.estimate_size() as u64; + match entry { + JournalEntry::UpdateMemoryRegionV1 { + region, + compressed_data, + } => { + let (uncompressed_size, compressed_data) = + block::uncompressed_size(&compressed_data)?; + let decompressed_data = decompress(compressed_data, uncompressed_size)?; + + let mut file = self.file.write().unwrap(); + file.seek(std::io::SeekFrom::Start(region.start))?; + file.write_all(&decompressed_data)?; + } + JournalEntry::ProcessExitV1 { .. } | JournalEntry::InitModuleV1 { .. } => { + let file = self.file.read().unwrap(); + file.set_len(0)?; + } + _ => {} + } + + Ok(LogWriteResult { + record_start: 0, + record_end: estimated_size, + }) + } + + fn flush(&self) -> anyhow::Result<()> { + let mut file = self.file.write().unwrap(); + file.flush()?; + Ok(()) + } +} + +impl Journal for MemFileJournal { + fn split(self) -> (Box, Box) { + (Box::new(self.clone()), Box::new(self.clone())) + } +} diff --git a/lib/journal/src/concrete/mod.rs b/lib/journal/src/concrete/mod.rs index c517635b2b7..a7a8dfc38b5 100644 --- a/lib/journal/src/concrete/mod.rs +++ b/lib/journal/src/concrete/mod.rs @@ -12,6 +12,7 @@ mod counting; mod filter; #[cfg(feature = "log-file")] mod log_file; +mod mem_file; mod null; mod pipe; mod printing; @@ -33,6 +34,7 @@ pub use counting::*; pub use filter::*; #[cfg(feature = "log-file")] pub use log_file::*; +pub use mem_file::*; pub use null::*; pub use pipe::*; pub use printing::*; diff --git a/lib/journal/src/concrete/printing.rs b/lib/journal/src/concrete/printing.rs index 66bd0863191..2c9868b8e03 100644 --- a/lib/journal/src/concrete/printing.rs +++ b/lib/journal/src/concrete/printing.rs @@ -1,6 +1,7 @@ use std::fmt; use super::*; +use lz4_flex::block::uncompressed_size; use wasmer_wasix_types::wasi; /// Type of printing mode to use @@ -15,8 +16,8 @@ impl Default for JournalPrintingMode { } } -/// The default for runtime is to use the unsupported journal -/// which will fail to write journal entries if one attempts to do so. +/// The printing journal writes all the journal entries to the console +/// as either text or json. #[derive(Debug, Default)] pub struct PrintingJournal { mode: JournalPrintingMode, @@ -75,12 +76,18 @@ impl<'a> fmt::Display for JournalEntry<'a> { JournalEntry::ClearEtherealV1 => { write!(f, "clear-ethereal") } - JournalEntry::UpdateMemoryRegionV1 { region, data } => write!( + JournalEntry::UpdateMemoryRegionV1 { + region, + compressed_data, + } => write!( f, - "memory-update (start={}, end={}, data.len={})", + "memory-update (start={}, end={}, data.len={}, compressed.len={})", region.start, region.end, - data.len() + uncompressed_size(compressed_data.as_ref()) + .map(|a| a.0) + .unwrap_or_else(|_| compressed_data.as_ref().len()), + compressed_data.len() ), JournalEntry::ProcessExitV1 { exit_code } => { write!(f, "process-exit (code={:?})", exit_code) diff --git a/lib/journal/src/concrete/tests.rs b/lib/journal/src/concrete/tests.rs index 722d9d1faf9..1e1d8f4a611 100644 --- a/lib/journal/src/concrete/tests.rs +++ b/lib/journal/src/concrete/tests.rs @@ -4,6 +4,7 @@ use std::{ }; use super::*; +use lz4_flex::compress_prepend_size; use rkyv::ser::serializers::{ AllocScratch, CompositeSerializer, SharedSerializeMap, WriteSerializer, }; @@ -115,7 +116,7 @@ pub fn test_record_descriptor_write() { pub fn test_record_update_memory() { run_test(JournalEntry::UpdateMemoryRegionV1 { region: 76u64..8237453u64, - data: [74u8; 40960].to_vec().into(), + compressed_data: compress_prepend_size(&[74u8; 40960]).into(), }); } diff --git a/lib/journal/src/entry.rs b/lib/journal/src/entry.rs index b5e09519c12..6b570cdc3bf 100644 --- a/lib/journal/src/entry.rs +++ b/lib/journal/src/entry.rs @@ -91,7 +91,7 @@ pub enum JournalEntry<'a> { region: Range, #[derivative(Debug = "ignore")] #[serde(with = "base64")] - data: Cow<'a, [u8]>, + compressed_data: Cow<'a, [u8]>, }, ProcessExitV1 { exit_code: Option, @@ -376,9 +376,12 @@ impl<'a> JournalEntry<'a> { match self { Self::InitModuleV1 { wasm_hash } => JournalEntry::InitModuleV1 { wasm_hash }, Self::ClearEtherealV1 => JournalEntry::ClearEtherealV1, - Self::UpdateMemoryRegionV1 { region, data } => JournalEntry::UpdateMemoryRegionV1 { + Self::UpdateMemoryRegionV1 { region, - data: data.into_owned().into(), + compressed_data, + } => JournalEntry::UpdateMemoryRegionV1 { + region, + compressed_data: compressed_data.into_owned().into(), }, Self::ProcessExitV1 { exit_code } => JournalEntry::ProcessExitV1 { exit_code }, Self::SetThreadV1 { @@ -717,7 +720,9 @@ impl<'a> JournalEntry<'a> { match self { JournalEntry::InitModuleV1 { .. } => base_size, JournalEntry::ClearEtherealV1 => base_size, - JournalEntry::UpdateMemoryRegionV1 { data, .. } => base_size + data.len(), + JournalEntry::UpdateMemoryRegionV1 { + compressed_data, .. + } => base_size + compressed_data.len(), JournalEntry::ProcessExitV1 { .. } => base_size, JournalEntry::SetThreadV1 { call_stack, diff --git a/lib/sys-utils/src/memory/fd_memory/fd_mmap.rs b/lib/sys-utils/src/memory/fd_memory/fd_mmap.rs index b7c718a5856..ea9391b3ab6 100644 --- a/lib/sys-utils/src/memory/fd_memory/fd_mmap.rs +++ b/lib/sys-utils/src/memory/fd_memory/fd_mmap.rs @@ -109,9 +109,6 @@ impl FdMmap { } } - // Compute the flags - let flags = libc::MAP_FILE | libc::MAP_SHARED; - Ok(if accessible_size == mapping_size { // Allocate a single read-write region at once. let ptr = unsafe { @@ -119,7 +116,7 @@ impl FdMmap { ptr::null_mut(), mapping_size, libc::PROT_READ | libc::PROT_WRITE, - flags, + libc::MAP_FILE | libc::MAP_SHARED, fd.0, 0, ) @@ -140,7 +137,7 @@ impl FdMmap { ptr::null_mut(), mapping_size, libc::PROT_NONE, - flags, + libc::MAP_FILE | libc::MAP_SHARED, fd.0, 0, ) diff --git a/lib/vm/src/lib.rs b/lib/vm/src/lib.rs index 3ae29ea05a9..fabdc38a48f 100644 --- a/lib/vm/src/lib.rs +++ b/lib/vm/src/lib.rs @@ -45,7 +45,7 @@ pub use crate::memory::{ initialize_memory_with_data, LinearMemory, NotifyLocation, VMMemory, VMOwnedMemory, VMSharedMemory, }; -pub use crate::mmap::Mmap; +pub use crate::mmap::{Mmap, MmapType}; pub use crate::probestack::PROBESTACK; pub use crate::sig_registry::SignatureRegistry; pub use crate::store::{InternalStoreHandle, MaybeInstanceOwned, StoreHandle, StoreObjects}; diff --git a/lib/vm/src/memory.rs b/lib/vm/src/memory.rs index d9561a4db1f..d3565608cf1 100644 --- a/lib/vm/src/memory.rs +++ b/lib/vm/src/memory.rs @@ -5,6 +5,7 @@ //! //! `Memory` is to WebAssembly linear memories what `Table` is to WebAssembly tables. +use crate::mmap::MmapType; use crate::threadconditions::ThreadConditions; pub use crate::threadconditions::{NotifyLocation, WaiterError}; use crate::trap::Trap; @@ -95,7 +96,8 @@ impl WasmMmap { })?; let mut new_mmap = - Mmap::accessible_reserved(new_bytes, request_bytes).map_err(MemoryError::Region)?; + Mmap::accessible_reserved(new_bytes, request_bytes, None, MmapType::Private) + .map_err(MemoryError::Region)?; let copy_len = self.alloc.len() - conf.offset_guard_size; new_mmap.as_mut_slice()[..copy_len].copy_from_slice(&self.alloc.as_slice()[..copy_len]); @@ -207,7 +209,22 @@ impl VMOwnedMemory { /// This creates a `Memory` with owned metadata: this can be used to create a memory /// that will be imported into Wasm modules. pub fn new(memory: &MemoryType, style: &MemoryStyle) -> Result { - unsafe { Self::new_internal(memory, style, None) } + unsafe { Self::new_internal(memory, style, None, None, MmapType::Private) } + } + + /// Create a new linear memory instance with specified minimum and maximum number of wasm pages + /// that is backed by a memory file. When set to private the file will be remaing in memory and + /// never flush to disk, when set to shared the memory will be flushed to disk. + /// + /// This creates a `Memory` with owned metadata: this can be used to create a memory + /// that will be imported into Wasm modules. + pub fn new_with_file( + memory: &MemoryType, + style: &MemoryStyle, + backing_file: std::fs::File, + memory_type: MmapType, + ) -> Result { + unsafe { Self::new_internal(memory, style, None, Some(backing_file), memory_type) } } /// Create a new linear memory instance with specified minimum and maximum number of wasm pages. @@ -222,7 +239,38 @@ impl VMOwnedMemory { style: &MemoryStyle, vm_memory_location: NonNull, ) -> Result { - Self::new_internal(memory, style, Some(vm_memory_location)) + Self::new_internal( + memory, + style, + Some(vm_memory_location), + None, + MmapType::Private, + ) + } + + /// Create a new linear memory instance with specified minimum and maximum number of wasm pages + /// that is backed by a file. When set to private the file will be remaing in memory and + /// never flush to disk, when set to shared the memory will be flushed to disk. + /// + /// This creates a `Memory` with metadata owned by a VM, pointed to by + /// `vm_memory_location`: this can be used to create a local memory. + /// + /// # Safety + /// - `vm_memory_location` must point to a valid location in VM memory. + pub unsafe fn from_definition_with_file( + memory: &MemoryType, + style: &MemoryStyle, + vm_memory_location: NonNull, + backing_file: Option, + memory_type: MmapType, + ) -> Result { + Self::new_internal( + memory, + style, + Some(vm_memory_location), + backing_file, + memory_type, + ) } /// Build a `Memory` with either self-owned or VM owned metadata. @@ -230,6 +278,8 @@ impl VMOwnedMemory { memory: &MemoryType, style: &MemoryStyle, vm_memory_location: Option>, + backing_file: Option, + memory_type: MmapType, ) -> Result { if memory.minimum > Pages::max_value() { return Err(MemoryError::MinimumMemoryTooLarge { @@ -269,10 +319,16 @@ impl VMOwnedMemory { let mapped_pages = memory.minimum; let mapped_bytes = mapped_pages.bytes(); - let mut alloc = Mmap::accessible_reserved(mapped_bytes.0, request_bytes) - .map_err(MemoryError::Region)?; + let mut alloc = + Mmap::accessible_reserved(mapped_bytes.0, request_bytes, backing_file, memory_type) + .map_err(MemoryError::Region)?; + let base_ptr = alloc.as_mut_ptr(); - let mem_length = memory.minimum.bytes().0; + let mem_length = memory + .minimum + .bytes() + .0 + .max(alloc.as_slice_accessible().len()); let mmap = WasmMmap { vm_memory_definition: if let Some(mem_loc) = vm_memory_location { { @@ -289,7 +345,7 @@ impl VMOwnedMemory { }))) }, alloc, - size: memory.minimum, + size: Bytes::from(mem_length).try_into().unwrap(), }; Ok(Self { @@ -398,6 +454,21 @@ impl VMSharedMemory { Ok(VMOwnedMemory::new(memory, style)?.to_shared()) } + /// Create a new linear memory instance with specified minimum and maximum number of wasm pages + /// that is backed by a file. When set to private the file will be remaing in memory and + /// never flush to disk, when set to shared the memory will be flushed to disk. + /// + /// This creates a `Memory` with owned metadata: this can be used to create a memory + /// that will be imported into Wasm modules. + pub fn new_with_file( + memory: &MemoryType, + style: &MemoryStyle, + backing_file: std::fs::File, + memory_type: MmapType, + ) -> Result { + Ok(VMOwnedMemory::new_with_file(memory, style, backing_file, memory_type)?.to_shared()) + } + /// Create a new linear memory instance with specified minimum and maximum number of wasm pages. /// /// This creates a `Memory` with metadata owned by a VM, pointed to by @@ -413,6 +484,32 @@ impl VMSharedMemory { Ok(VMOwnedMemory::from_definition(memory, style, vm_memory_location)?.to_shared()) } + /// Create a new linear memory instance with specified minimum and maximum number of wasm pages + /// that is backed by a file. When set to private the file will be remaing in memory and + /// never flush to disk, when set to shared the memory will be flushed to disk. + /// + /// This creates a `Memory` with metadata owned by a VM, pointed to by + /// `vm_memory_location`: this can be used to create a local memory. + /// + /// # Safety + /// - `vm_memory_location` must point to a valid location in VM memory. + pub unsafe fn from_definition_with_file( + memory: &MemoryType, + style: &MemoryStyle, + vm_memory_location: NonNull, + backing_file: Option, + memory_type: MmapType, + ) -> Result { + Ok(VMOwnedMemory::from_definition_with_file( + memory, + style, + vm_memory_location, + backing_file, + memory_type, + )? + .to_shared()) + } + /// Copies this memory to a new memory pub fn copy(&mut self) -> Result { let mut guard = self.mmap.write().unwrap(); diff --git a/lib/vm/src/mmap.rs b/lib/vm/src/mmap.rs index 8d4934a665b..c6d82f88308 100644 --- a/lib/vm/src/mmap.rs +++ b/lib/vm/src/mmap.rs @@ -5,7 +5,6 @@ //! of memory. use more_asserts::assert_le; -use more_asserts::assert_lt; use std::io; use std::ptr; use std::slice; @@ -26,6 +25,17 @@ pub struct Mmap { ptr: usize, total_size: usize, accessible_size: usize, + sync_on_drop: bool, +} + +/// The type of mmap to create +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum MmapType { + /// The memory is private to the process and not shared with other processes. + Private, + /// The memory is shared with other processes. This is only supported on Unix. + /// When the memory is flushed it will update the file data. + Shared, } impl Mmap { @@ -39,6 +49,7 @@ impl Mmap { ptr: empty.as_ptr() as usize, total_size: 0, accessible_size: 0, + sync_on_drop: false, } } @@ -46,7 +57,7 @@ impl Mmap { pub fn with_at_least(size: usize) -> Result { let page_size = region::page::size(); let rounded_size = round_up_to_page_size(size, page_size); - Self::accessible_reserved(rounded_size, rounded_size) + Self::accessible_reserved(rounded_size, rounded_size, None, MmapType::Private) } /// Create a new `Mmap` pointing to `accessible_size` bytes of page-aligned accessible memory, @@ -54,9 +65,13 @@ impl Mmap { /// must be native page-size multiples. #[cfg(not(target_os = "windows"))] pub fn accessible_reserved( - accessible_size: usize, + mut accessible_size: usize, mapping_size: usize, + mut backing_file: Option, + memory_type: MmapType, ) -> Result { + use std::os::fd::IntoRawFd; + let page_size = region::page::size(); assert_le!(accessible_size, mapping_size); assert_eq!(mapping_size & (page_size - 1), 0); @@ -68,6 +83,30 @@ impl Mmap { return Ok(Self::new()); } + // If there is a backing file, resize the file so that its at least + // `mapping_size` bytes. + if let Some(backing_file) = &mut backing_file { + let len = backing_file.metadata().map_err(|e| e.to_string())?.len() as usize; + if len < mapping_size { + backing_file + .set_len(mapping_size as u64) + .map_err(|e| e.to_string())?; + } + accessible_size = accessible_size.max(len).min(mapping_size); + } + + let memory_fd = backing_file.map_or(-1, |fd| fd.into_raw_fd()); + + // Compute the flags + let mut flags = match memory_fd { + fd if fd < 0 => libc::MAP_ANON, + _ => libc::MAP_FILE, + }; + flags |= match memory_type { + MmapType::Private => libc::MAP_PRIVATE, + MmapType::Shared => libc::MAP_SHARED, + }; + Ok(if accessible_size == mapping_size { // Allocate a single read-write region at once. let ptr = unsafe { @@ -75,8 +114,8 @@ impl Mmap { ptr::null_mut(), mapping_size, libc::PROT_READ | libc::PROT_WRITE, - libc::MAP_PRIVATE | libc::MAP_ANON, - -1, + flags, + memory_fd, 0, ) }; @@ -88,6 +127,7 @@ impl Mmap { ptr: ptr as usize, total_size: mapping_size, accessible_size, + sync_on_drop: memory_fd != -1 && memory_type == MmapType::Shared, } } else { // Reserve the mapping size. @@ -96,8 +136,8 @@ impl Mmap { ptr::null_mut(), mapping_size, libc::PROT_NONE, - libc::MAP_PRIVATE | libc::MAP_ANON, - -1, + flags, + memory_fd, 0, ) }; @@ -109,6 +149,7 @@ impl Mmap { ptr: ptr as usize, total_size: mapping_size, accessible_size, + sync_on_drop: memory_fd != -1 && memory_type == MmapType::Shared, }; if accessible_size != 0 { @@ -127,6 +168,8 @@ impl Mmap { pub fn accessible_reserved( accessible_size: usize, mapping_size: usize, + _backing_file: Option, + _memory_type: MmapType, ) -> Result { use winapi::um::memoryapi::VirtualAlloc; use winapi::um::winnt::{MEM_COMMIT, MEM_RESERVE, PAGE_NOACCESS, PAGE_READWRITE}; @@ -160,6 +203,7 @@ impl Mmap { ptr: ptr as usize, total_size: mapping_size, accessible_size, + sync_on_drop: false, } } else { // Reserve the mapping size. @@ -173,6 +217,7 @@ impl Mmap { ptr: ptr as usize, total_size: mapping_size, accessible_size, + sync_on_drop: false, }; if accessible_size != 0 { @@ -192,8 +237,8 @@ impl Mmap { let page_size = region::page::size(); assert_eq!(start & (page_size - 1), 0); assert_eq!(len & (page_size - 1), 0); - assert_lt!(len, self.total_size); - assert_lt!(start, self.total_size - len); + assert_le!(len, self.total_size); + assert_le!(start, self.total_size - len); // Commit the accessible size. let ptr = self.ptr as *const u8; @@ -212,8 +257,8 @@ impl Mmap { let page_size = region::page::size(); assert_eq!(start & (page_size - 1), 0); assert_eq!(len & (page_size - 1), 0); - assert_lt!(len, self.len()); - assert_lt!(start, self.len() - len); + assert_le!(len, self.len()); + assert_le!(start, self.len() - len); // Commit the accessible size. let ptr = self.ptr as *const u8; @@ -300,7 +345,8 @@ impl Mmap { copy_size = usize::max(copy_size, size_hint); } - let mut new = Self::accessible_reserved(copy_size, self.total_size)?; + let mut new = + Self::accessible_reserved(copy_size, self.total_size, None, MmapType::Private)?; new.as_mut_slice_arbitary(copy_size) .copy_from_slice(self.as_slice_arbitary(copy_size)); Ok(new) @@ -311,6 +357,16 @@ impl Drop for Mmap { #[cfg(not(target_os = "windows"))] fn drop(&mut self) { if self.total_size != 0 { + if self.sync_on_drop { + let r = unsafe { + libc::msync( + self.ptr as *mut libc::c_void, + self.total_size, + libc::MS_SYNC | libc::MS_INVALIDATE, + ) + }; + assert_eq!(r, 0, "msync failed: {}", io::Error::last_os_error()); + } let r = unsafe { libc::munmap(self.ptr as *mut libc::c_void, self.total_size) }; assert_eq!(r, 0, "munmap failed: {}", io::Error::last_os_error()); } diff --git a/lib/wasix/src/journal/effector/memory_and_snapshot.rs b/lib/wasix/src/journal/effector/memory_and_snapshot.rs index ca7aa11e109..6804e46f9c5 100644 --- a/lib/wasix/src/journal/effector/memory_and_snapshot.rs +++ b/lib/wasix/src/journal/effector/memory_and_snapshot.rs @@ -1,5 +1,10 @@ use std::collections::{hash_map, BTreeMap}; +#[allow(unused)] +use lz4_flex::{ + self, block, compress_prepend_size, decompress, decompress_into, decompress_size_prepended, +}; + use crate::os::task::process::MemorySnapshotRegion; use super::*; @@ -47,10 +52,11 @@ impl JournalEffector { let mut cur = 0u64; let mut regions = Vec::::new(); while cur < memory.data_size() { - let mut again = false; + //let mut again = false; let next = ((cur + MEMORY_REGION_RESOLUTION) / MEMORY_REGION_RESOLUTION) * MEMORY_REGION_RESOLUTION; - let mut end = memory.data_size().min(next); + let end = memory.data_size().min(next); + /* for (_, thread) in guard.threads.iter() { let layout = thread.memory_layout(); if cur >= layout.stack_lower && cur < layout.stack_upper { @@ -65,6 +71,7 @@ impl JournalEffector { if again { continue; } + */ let region = cur..end; regions.push(region.into()); @@ -148,15 +155,28 @@ impl JournalEffector { // We grab this region of memory as a vector and hash // it, which allows us to make some logging efficiency // gains. - let data = memory - .copy_range_to_vec(region.into()) - .map_err(mem_error_to_wasi)?; + #[cfg(not(feature = "sys"))] + let compressed_data = compress_prepend_size( + &memory + .copy_range_to_vec(region.into()) + .map_err(mem_error_to_wasi)?, + ); + + // UNSAFE: + // + // This is only unsafe while the WASM process itself is running and using this + // method avoids a memory copy before its compressed, this also signficantly + // reduces the memory process + #[cfg(feature = "sys")] + let compressed_data = compress_prepend_size(unsafe { + &memory.data_unchecked()[region.start as usize..region.end as usize] + }); // Now we write it to the snap snapshot capturer journal .write(JournalEntry::UpdateMemoryRegionV1 { region: region.into(), - data: data.into(), + compressed_data: compressed_data.into(), }) .map_err(map_snapshot_err)?; } @@ -179,45 +199,87 @@ impl JournalEffector { /// This function manipulates the memory of the process and thus must be executed /// by the WASM process thread itself. /// - pub unsafe fn apply_memory( + pub unsafe fn apply_compressed_memory( ctx: &mut FunctionEnvMut<'_, WasiEnv>, region: Range, - mut data: &[u8], + compressed_data: &[u8], ) -> anyhow::Result<()> { let (env, mut store) = ctx.data_and_store_mut(); + let (uncompressed_size, compressed_data) = block::uncompressed_size(compressed_data) + .map_err(|err| anyhow::anyhow!("failed to decompress - {}", err))?; + let memory = unsafe { env.memory() }; - memory.grow_at_least(&mut store, region.end + data.len() as u64)?; + memory.grow_at_least(&mut store, region.end + uncompressed_size as u64)?; // Write the data to the memory let memory = unsafe { env.memory_view(&store) }; - memory - .write(region.start, data) - .map_err(|err| WasiRuntimeError::Runtime(RuntimeError::user(err.into())))?; - - // Break the region down into chunks that align with the resolution - let mut offset = region.start; - while offset < region.end { - let next = region.end.min(offset + MEMORY_REGION_RESOLUTION); - let region = offset..next; - offset = next; - - // Compute the hash and update it - let size = region.end - region.start; - let hash = { - let h: [u8; 32] = blake3::hash(&data[..size as usize]).into(); - u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]]) - }; - env.process - .inner - .0 - .lock() - .unwrap() - .snapshot_memory_hash - .insert(region.into(), hash); - - // Shift the data pointer - data = &data[size as usize..]; + + #[cfg(not(feature = "sys"))] + { + let decompressed_data = decompress(compressed_data, uncompressed_size)?; + memory + .write(region.start, &decompressed_data) + .map_err(|err| WasiRuntimeError::Runtime(RuntimeError::user(err.into())))?; + + // Break the region down into chunks that align with the resolution + let mut decompressed_data = &decompressed_data[..]; + let mut offset = region.start; + while offset < region.end { + let next = region.end.min(offset + MEMORY_REGION_RESOLUTION); + let region = offset..next; + offset = next; + + // Compute the hash and update it + let size = region.end - region.start; + let hash = { + let h: [u8; 32] = blake3::hash(&decompressed_data[..size as usize]).into(); + u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]]) + }; + env.process + .inner + .0 + .lock() + .unwrap() + .snapshot_memory_hash + .insert(region.into(), hash); + + // Shift the data pointer + decompressed_data = &decompressed_data[size as usize..]; + } + } + + #[cfg(feature = "sys")] + unsafe { + let start = region.start as usize; + let end = start + uncompressed_size; + decompress_into( + compressed_data, + &mut memory.data_unchecked_mut()[start..end], + )?; + + // Break the region down into chunks that align with the resolution + let data = &memory.data_unchecked()[start..end]; + let mut offset = region.start; + while offset < region.end { + let next = region.end.min(offset + MEMORY_REGION_RESOLUTION); + let region = offset..next; + + // Compute the hash and update it + let hash = { + let h: [u8; 32] = blake3::hash(&data[offset as usize..next as usize]).into(); + u64::from_be_bytes([h[0], h[1], h[2], h[3], h[4], h[5], h[6], h[7]]) + }; + env.process + .inner + .0 + .lock() + .unwrap() + .snapshot_memory_hash + .insert(region.into(), hash); + + offset = next; + } } Ok(()) diff --git a/lib/wasix/src/journal/effector/mod.rs b/lib/wasix/src/journal/effector/mod.rs index 632207a8adb..45fb3f7767e 100644 --- a/lib/wasix/src/journal/effector/mod.rs +++ b/lib/wasix/src/journal/effector/mod.rs @@ -2,7 +2,7 @@ pub(super) use std::{borrow::Cow, ops::Range, sync::MutexGuard, time::SystemTime pub(super) use anyhow::bail; pub(super) use bytes::Bytes; -pub(super) use wasmer::{FunctionEnvMut, RuntimeError, WasmPtr}; +pub(super) use wasmer::{FunctionEnvMut, WasmPtr}; pub(super) use wasmer_types::MemorySize; pub(super) use wasmer_wasix_types::{ types::__wasi_ciovec_t, @@ -17,7 +17,7 @@ pub(super) use crate::{ os::task::process::WasiProcessInner, syscalls::{fd_write_internal, FdWriteSource}, utils::map_snapshot_err, - WasiEnv, WasiRuntimeError, WasiThreadId, + WasiEnv, WasiThreadId, }; use super::*; diff --git a/lib/wasix/src/runtime/task_manager/tokio.rs b/lib/wasix/src/runtime/task_manager/tokio.rs index 39408896890..c524134c6e0 100644 --- a/lib/wasix/src/runtime/task_manager/tokio.rs +++ b/lib/wasix/src/runtime/task_manager/tokio.rs @@ -110,6 +110,7 @@ impl Default for TokioTaskManager { } } +#[allow(dead_code)] struct TokioRuntimeGuard<'g> { #[allow(unused)] inner: tokio::runtime::EnterGuard<'g>, diff --git a/lib/wasix/src/state/mod.rs b/lib/wasix/src/state/mod.rs index 4f20aea3f0a..6d1dce47069 100644 --- a/lib/wasix/src/state/mod.rs +++ b/lib/wasix/src/state/mod.rs @@ -53,6 +53,7 @@ pub(crate) use handles::*; /// all the rights enabled pub const ALL_RIGHTS: Rights = Rights::all(); +#[allow(dead_code)] struct WasiStateOpener { root_fs: WasiFsRoot, } diff --git a/lib/wasix/src/state/types.rs b/lib/wasix/src/state/types.rs index ac909d6bb70..9508bf3bf0f 100644 --- a/lib/wasix/src/state/types.rs +++ b/lib/wasix/src/state/types.rs @@ -95,6 +95,7 @@ impl PollEventBuilder { } } +#[allow(dead_code)] pub trait WasiPath {} /* diff --git a/lib/wasix/src/syscalls/journal/actions/update_memory.rs b/lib/wasix/src/syscalls/journal/actions/update_memory.rs index 743183a2186..e12d9da7ec9 100644 --- a/lib/wasix/src/syscalls/journal/actions/update_memory.rs +++ b/lib/wasix/src/syscalls/journal/actions/update_memory.rs @@ -2,10 +2,10 @@ use super::*; impl<'a, 'c> JournalSyscallPlayer<'a, 'c> { #[allow(clippy::result_large_err)] - pub(crate) unsafe fn action_update_memory( + pub(crate) unsafe fn action_update_compressed_memory( &mut self, region: Range, - data: Cow<'a, [u8]>, + compressed_data: Cow<'a, [u8]>, differ_ethereal: Option<&mut Vec>>, ) -> Result<(), WasiRuntimeError> { if Some(&self.cur_module_hash) != self.journal_module_hash.as_ref() { @@ -15,13 +15,16 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> { if self.bootstrapping { tracing::trace!("Differ(stage) journal - UpdateMemory"); - self.staged_differ_memory.push((region, data)); + self.staged_differ_memory.push((region, compressed_data)); } else if let Some(differ_ethereal) = differ_ethereal { tracing::trace!("Differ(ether) journal - UpdateMemory"); - differ_ethereal.push(JournalEntry::UpdateMemoryRegionV1 { region, data }); + differ_ethereal.push(JournalEntry::UpdateMemoryRegionV1 { + region, + compressed_data, + }); } else { tracing::trace!("Replay journal - UpdateMemory"); - JournalEffector::apply_memory(&mut self.ctx, region, &data) + JournalEffector::apply_compressed_memory(&mut self.ctx, region, &compressed_data) .map_err(anyhow_err_to_runtime_err)?; } Ok(()) diff --git a/lib/wasix/src/syscalls/journal/play_event.rs b/lib/wasix/src/syscalls/journal/play_event.rs index be62674723f..facece86616 100644 --- a/lib/wasix/src/syscalls/journal/play_event.rs +++ b/lib/wasix/src/syscalls/journal/play_event.rs @@ -49,8 +49,11 @@ impl<'a, 'c> JournalSyscallPlayer<'a, 'c> { self.action_fd_seek(fd, offset, whence)?; } } - JournalEntry::UpdateMemoryRegionV1 { region, data } => { - self.action_update_memory(region, data, differ_ethereal)?; + JournalEntry::UpdateMemoryRegionV1 { + region, + compressed_data, + } => { + self.action_update_compressed_memory(region, compressed_data, differ_ethereal)?; } JournalEntry::CloseThreadV1 { id, exit_code } => { self.action_close_thread(id, exit_code, differ_ethereal)?; diff --git a/lib/wasix/src/syscalls/journal/restore_snapshot.rs b/lib/wasix/src/syscalls/journal/restore_snapshot.rs index 6535ebf4353..96457957b1b 100644 --- a/lib/wasix/src/syscalls/journal/restore_snapshot.rs +++ b/lib/wasix/src/syscalls/journal/restore_snapshot.rs @@ -54,7 +54,7 @@ pub unsafe fn restore_snapshot( region, data.len() ); - JournalEffector::apply_memory(&mut runner.ctx, region, &data) + JournalEffector::apply_compressed_memory(&mut runner.ctx, region, &data) .map_err(anyhow_err_to_runtime_err)?; }