Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple improvements for the journal load times #4706

Merged
merged 18 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions lib/cli/src/commands/journal/extract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::path::PathBuf;

use clap::Parser;
use wasmer_wasix::journal::{copy_journal, LogFileJournal};

use crate::commands::CliCommand;

#[derive(Debug, Parser)]
pub struct CmdExtractWhatMemory {
/// Path to the memory file that will be updated using this journal
#[clap(index = 1)]
memory_file_path: PathBuf,
}

/// What to extract from the journal
#[derive(clap::Subcommand, Debug)]
pub enum CmdExtractWhat {
Memory(CmdExtractWhatMemory),
}

/// Extracts an element from the journal
#[derive(Debug, Parser)]
pub struct CmdJournalExtract {
/// Path to the journal that will be compacted
#[clap(index = 1)]
journal_path: PathBuf,

#[clap(subcommand)]
what: CmdExtractWhat,
}

impl CliCommand for CmdJournalExtract {
type Output = ();

fn run(self) -> Result<(), anyhow::Error> {
let journal = LogFileJournal::new(&self.journal_path)?;

match self.what {
CmdExtractWhat::Memory(cmd) => {
let memory_file =
wasmer_wasix::journal::MemFileJournal::new(&cmd.memory_file_path)?;
copy_journal(&journal, &memory_file)?;
}
}
Ok(())
}
}
5 changes: 5 additions & 0 deletions lib/cli/src/commands/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::commands::CliCommand;

mod compact;
mod export;
mod extract;
mod filter;
mod import;
mod inspect;
Expand All @@ -10,6 +11,7 @@ mod mount;

pub use compact::*;
pub use export::*;
pub use extract::*;
pub use filter::*;
pub use import::*;
pub use inspect::*;
Expand All @@ -32,6 +34,8 @@ pub enum CmdJournal {
/// Mounts the journal at a particular directory
#[cfg(feature = "fuse")]
Mount(CmdJournalMount),
/// Extracts an element of a journal
Extract(CmdJournalExtract),
}

impl CliCommand for CmdJournal {
Expand All @@ -46,6 +50,7 @@ impl CliCommand for CmdJournal {
Self::Filter(cmd) => cmd.run(),
#[cfg(feature = "fuse")]
Self::Mount(cmd) => cmd.run(),
Self::Extract(cmd) => cmd.run(),
}
}
}
6 changes: 6 additions & 0 deletions lib/compiler-cranelift/src/translator/func_environ.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub trait TargetEnvironment {
}

/// Get the size of a native pointer, in bytes.
#[allow(dead_code)]
fn pointer_bytes(&self) -> u8 {
self.target_config().pointer_bytes()
}
Expand Down Expand Up @@ -470,6 +471,7 @@ pub trait FuncEnvironment: TargetEnvironment {
}

/// Get the type of the global at the given index.
#[allow(dead_code)]
fn get_global_type(&self, global_index: GlobalIndex) -> Option<WasmerType>;

/// Push a local declaration on to the stack to track the type of locals.
Expand All @@ -479,14 +481,18 @@ pub trait FuncEnvironment: TargetEnvironment {
fn push_params_on_stack(&mut self, function_index: LocalFunctionIndex);

/// Get the type of the local at the given index.
#[allow(dead_code)]
fn get_local_type(&self, local_index: u32) -> Option<WasmerType>;

/// Get the types of all the current locals.
#[allow(dead_code)]
fn get_local_types(&self) -> &[WasmerType];

/// Get the type of the local at the given index.
#[allow(dead_code)]
fn get_function_type(&self, function_index: FunctionIndex) -> Option<&FunctionType>;

/// Get the type of a function with the given signature index.
#[allow(dead_code)]
fn get_function_sig(&self, sig_index: SignatureIndex) -> Option<&FunctionType>;
}
16 changes: 8 additions & 8 deletions lib/journal/src/concrete/archived.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use lz4_flex::block::compress_prepend_size;
use num_enum::{IntoPrimitive, TryFromPrimitive};
use rkyv::ser::{ScratchSpace, Serializer};
use rkyv::{Archive, CheckBytes, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
Expand Down Expand Up @@ -443,13 +442,14 @@ impl<'a> JournalEntry<'a> {
JournalEntry::ClearEtherealV1 => {
serializer.serialize_value(&JournalEntryClearEtherealV1 {})
}
JournalEntry::UpdateMemoryRegionV1 { region, data } => {
serializer.serialize_value(&JournalEntryUpdateMemoryRegionV1 {
start: region.start,
end: region.end,
compressed_data: compress_prepend_size(data.as_ref()).into(),
})
}
JournalEntry::UpdateMemoryRegionV1 {
region,
compressed_data,
} => serializer.serialize_value(&JournalEntryUpdateMemoryRegionV1 {
start: region.start,
end: region.end,
compressed_data: compressed_data.into(),
}),
JournalEntry::ProcessExitV1 { exit_code } => {
serializer.serialize_value(&JournalEntryProcessExitV1 {
exit_code: exit_code.map(|e| e.into()),
Expand Down
3 changes: 1 addition & 2 deletions lib/journal/src/concrete/archived_from.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use lz4_flex::block::decompress_size_prepended;
use std::borrow::Cow;
use std::time::SystemTime;
use wasmer_wasix_types::wasi;
Expand Down Expand Up @@ -663,7 +662,7 @@ impl<'a> TryFrom<ArchivedJournalEntry<'a>> for JournalEntry<'a> {
},
) => Self::UpdateMemoryRegionV1 {
region: (*start)..(*end),
data: Cow::Owned(decompress_size_prepended(compressed_data.as_ref())?),
compressed_data: Cow::Borrowed(compressed_data.as_ref()),
},
ArchivedJournalEntry::ProcessExitV1(ArchivedJournalEntryProcessExitV1 {
exit_code,
Expand Down
87 changes: 81 additions & 6 deletions lib/journal/src/concrete/compacting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ struct State {
// Thread events are only maintained while the thread and the
// process are still running
thread_map: HashMap<u32, usize>,
// Sockets that are open and not yet closed are kept here
open_sockets: HashMap<Fd, DescriptorLookup>,
// Open pipes have two file descriptors that are associated with
// them. We keep track of both of them
open_pipes: HashMap<Fd, DescriptorLookup>,
// Any descriptors are assumed to be read only operations until
// they actually do something that changes the system
suspect_descriptors: HashMap<Fd, DescriptorLookup>,
Expand Down Expand Up @@ -123,6 +128,26 @@ impl State {
}
}
}
for (_, l) in self.open_sockets.iter() {
if let Some(d) = self.descriptors.get(l) {
for e in d.events.iter() {
filter.add_event_to_whitelist(*e);
}
for e in d.write_map.values() {
filter.add_event_to_whitelist(*e);
}
}
}
for (_, l) in self.open_pipes.iter() {
if let Some(d) = self.descriptors.get(l) {
for e in d.events.iter() {
filter.add_event_to_whitelist(*e);
}
for e in d.write_map.values() {
filter.add_event_to_whitelist(*e);
}
}
}
if has_threads {
for (_, l) in self.stdio_descriptors.iter() {
if let Some(d) = self.descriptors.get(l) {
Expand Down Expand Up @@ -181,6 +206,8 @@ impl CompactingJournal {
snapshots: Default::default(),
memory_map: Default::default(),
thread_map: Default::default(),
open_sockets: Default::default(),
open_pipes: Default::default(),
create_directory: Default::default(),
remove_directory: Default::default(),
create_trunc_file: Default::default(),
Expand Down Expand Up @@ -384,7 +411,14 @@ impl WritableJournal for CompactingJournalTx {
// Get the lookup
// (if its suspect then it will remove the entry and
// thus the entire branch of events it represents is discarded)
let mut skip = false;
let lookup = if matches!(&entry, JournalEntry::CloseFileDescriptorV1 { .. }) {
if state.open_sockets.remove(fd).is_some() {
skip = true;
}
if state.open_pipes.remove(fd).is_some() {
skip = true;
}
state.suspect_descriptors.remove(fd)
} else {
state.suspect_descriptors.get(fd).cloned()
Expand All @@ -393,11 +427,13 @@ impl WritableJournal for CompactingJournalTx {
.or_else(|| state.keep_descriptors.get(fd).cloned())
.or_else(|| state.stdio_descriptors.get(fd).cloned());

if let Some(lookup) = lookup {
let state = state.descriptors.entry(lookup).or_default();
state.events.push(event_index);
} else {
state.whitelist.insert(event_index);
if !skip {
if let Some(lookup) = lookup {
let state = state.descriptors.entry(lookup).or_default();
state.events.push(event_index);
} else {
state.whitelist.insert(event_index);
}
}
}
// Things that modify a file descriptor mean that it is
Expand All @@ -406,7 +442,15 @@ impl WritableJournal for CompactingJournalTx {
| JournalEntry::FileDescriptorAllocateV1 { fd, .. }
| JournalEntry::FileDescriptorSetFlagsV1 { fd, .. }
| JournalEntry::FileDescriptorSetTimesV1 { fd, .. }
| JournalEntry::FileDescriptorWriteV1 { fd, .. } => {
| JournalEntry::FileDescriptorWriteV1 { fd, .. }
| JournalEntry::SocketBindV1 { fd, .. }
| JournalEntry::SocketSendFileV1 { socket_fd: fd, .. }
| JournalEntry::SocketSendToV1 { fd, .. }
| JournalEntry::SocketSendV1 { fd, .. }
| JournalEntry::SocketSetOptFlagV1 { fd, .. }
| JournalEntry::SocketSetOptSizeV1 { fd, .. }
| JournalEntry::SocketSetOptTimeV1 { fd, .. }
| JournalEntry::SocketShutdownV1 { fd, .. } => {
// Its no longer suspect
if let Some(lookup) = state.suspect_descriptors.remove(fd) {
state.keep_descriptors.insert(*fd, lookup);
Expand All @@ -417,6 +461,8 @@ impl WritableJournal for CompactingJournalTx {
.suspect_descriptors
.get(fd)
.cloned()
.or_else(|| state.open_sockets.get(fd).cloned())
.or_else(|| state.open_pipes.get(fd).cloned())
.or_else(|| state.keep_descriptors.get(fd).cloned())
.or_else(|| state.stdio_descriptors.get(fd).cloned());

Expand Down Expand Up @@ -477,6 +523,35 @@ impl WritableJournal for CompactingJournalTx {
state.create_directory.remove(&path);
state.remove_directory.entry(path).or_insert(event_index);
}
// Pipes that remain open at the end will be added
JournalEntry::CreatePipeV1 { fd1, fd2, .. } => {
let lookup = DescriptorLookup(state.descriptor_seed);
state.descriptor_seed += 1;
state.open_pipes.insert(*fd1, lookup);

state
.descriptors
.entry(lookup)
.or_default()
.events
.push(event_index);

let lookup = DescriptorLookup(state.descriptor_seed);
state.descriptor_seed += 1;
state.open_pipes.insert(*fd2, lookup);
}
// Sockets that are accepted are suspect
JournalEntry::SocketAcceptedV1 { fd, .. } | JournalEntry::SocketOpenV1 { fd, .. } => {
let lookup = DescriptorLookup(state.descriptor_seed);
state.descriptor_seed += 1;
state.open_sockets.insert(*fd, lookup);
state
.descriptors
.entry(lookup)
.or_default()
.events
.push(event_index);
}
_ => {
// The fallthrough is to whitelist the event so that it will
// be reflected in the next compaction event
Expand Down
98 changes: 98 additions & 0 deletions lib/journal/src/concrete/mem_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::{
fs::File,
io::{Seek, Write},
path::Path,
sync::RwLock,
};

use lz4_flex::{block, decompress};

use super::*;

/// The memory file journal processes journal entries by writing any memory mutations
/// directly to a file. Later this can be used as a mounting target for resuming
/// a process without having to reload the journal from scratch.
#[derive(Debug)]
pub struct MemFileJournal {
file: RwLock<File>,
}

impl MemFileJournal {
pub fn new(path: &Path) -> anyhow::Result<Self> {
Ok(Self {
file: RwLock::new(
std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(path)?,
),
})
}
}

impl Drop for MemFileJournal {
fn drop(&mut self) {
let _ = self.flush();
}
}

impl Clone for MemFileJournal {
fn clone(&self) -> Self {
let file = self.file.read().unwrap();
Self {
file: RwLock::new(file.try_clone().unwrap()),
}
}
}

impl ReadableJournal for MemFileJournal {
fn read(&self) -> anyhow::Result<Option<LogReadResult<'_>>> {
Ok(None)
}

fn as_restarted(&self) -> anyhow::Result<Box<DynReadableJournal>> {
Ok(Box::new(self.clone()))
}
}

impl WritableJournal for MemFileJournal {
fn write<'a>(&'a self, entry: JournalEntry<'a>) -> anyhow::Result<LogWriteResult> {
let estimated_size = entry.estimate_size() as u64;
match entry {
JournalEntry::UpdateMemoryRegionV1 {
region,
compressed_data,
} => {
let (uncompressed_size, compressed_data) =
block::uncompressed_size(&compressed_data)?;
let decompressed_data = decompress(compressed_data, uncompressed_size)?;

let mut file = self.file.write().unwrap();
file.seek(std::io::SeekFrom::Start(region.start))?;
file.write_all(&decompressed_data)?;
}
JournalEntry::ProcessExitV1 { .. } | JournalEntry::InitModuleV1 { .. } => {
let file = self.file.read().unwrap();
file.set_len(0)?;
}
_ => {}
}

Ok(LogWriteResult {
record_start: 0,
record_end: estimated_size,
})
}

fn flush(&self) -> anyhow::Result<()> {
let mut file = self.file.write().unwrap();
file.flush()?;
Ok(())
}
}

impl Journal for MemFileJournal {
fn split(self) -> (Box<DynWritableJournal>, Box<DynReadableJournal>) {
(Box::new(self.clone()), Box::new(self.clone()))
}
}
Loading
Loading