Skip to content

Commit

Permalink
Add a memory limit to the web socket server
Browse files Browse the repository at this point in the history
  • Loading branch information
emilk committed Jan 2, 2024
1 parent 3b64675 commit e10c0ab
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 37 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 52 additions & 21 deletions crates/re_memory/src/memory_limit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
/// Represents a limit in how much RAM to use for the entire process.
///
/// Different systems can chose to heed the memory limit in different ways,
/// e.g. by dropping old data when it is exceeded.
///
/// It is recommended that they log using [`re_log::info_once`] when they
/// drop data because a memory limit is reached.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct MemoryLimit {
/// Limit in bytes.
///
Expand All @@ -9,39 +16,63 @@ pub struct MemoryLimit {
}

impl MemoryLimit {
/// No limit.
pub const UNLIMITED: Self = Self { limit: None };

/// Set the limit to some number of bytes.
pub fn from_bytes(max_bytes: u64) -> Self {
Self {
limit: Some(max_bytes as _),
}
}

/// Set the limit to some fraction (0-1) of the total available RAM.
pub fn from_fraction_of_total(fraction: f32) -> Self {
let total_memory = crate::total_ram_in_bytes();
if total_memory == 0 {
re_log::info!("Couldn't determine total available memory. Setting no memory limit.");
Self { limit: None }
} else {
let limit = (fraction as f64 * total_memory as f64).round();

re_log::debug!(
"Setting memory limit to {}, which is {}% of total available memory ({}).",
re_format::format_bytes(limit),
100.0 * fraction,
re_format::format_bytes(total_memory as _),
);

Self {
limit: Some(limit as _),
}
}
}

/// The limit can either be absolute (e.g. "16GB") or relative (e.g. "50%").
pub fn parse(limit: &str) -> Result<Self, String> {
if let Some(percentage) = limit.strip_suffix('%') {
let percentage = percentage
.parse::<f32>()
.map_err(|_err| format!("expected e.g. '50%', got {limit:?}"))?;

let total_memory = crate::total_ram_in_bytes();
if total_memory == 0 {
re_log::info!(
"Couldn't determine total available memory. Setting no memory limit."
);
Ok(Self { limit: None })
} else {
let limit = (total_memory as f64 * (percentage as f64 / 100.0)).round();

re_log::debug!(
"Setting memory limit to {}, which is {percentage}% of total available memory ({}).",
re_format::format_bytes(limit),
re_format::format_bytes(total_memory as _),
);

Ok(Self {
limit: Some(limit as _),
})
}
let fraction = percentage / 100.0;
Ok(Self::from_fraction_of_total(fraction))
} else {
re_format::parse_bytes(limit)
.map(|limit| Self { limit: Some(limit) })
.ok_or_else(|| format!("expected e.g. '16GB', got {limit:?}"))
}
}

#[inline]
pub fn is_limited(&self) -> bool {
self.limit.is_some()
}

#[inline]
pub fn is_unlimited(&self) -> bool {
self.limit.is_none()
}

/// Returns how large fraction of memory we should free to go down to the exact limit.
pub fn is_exceeded_by(&self, mem_use: &crate::MemoryUse) -> Option<f32> {
let limit = self.limit?;
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ re_build_info.workspace = true
re_log_encoding = { workspace = true, features = ["encoder"] }
re_log_types.workspace = true
re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }
re_types_core.workspace = true

Expand Down
2 changes: 2 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub use re_log_types::{
entity_path, ApplicationId, EntityPath, EntityPathPart, StoreId, StoreKind,
};

pub use re_memory::MemoryLimit;

pub use global::cleanup_if_forked_child;

#[cfg(not(target_arch = "wasm32"))]
Expand Down
24 changes: 22 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ impl RecordingStreamBuilder {
///
/// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli`).
///
/// ## Details
/// This method will spawn two servers: one HTTPS server serving the Rerun Web Viewer `.html` and `.wasm` files,
/// and then one WebSocket server that streams the log data to the web viewer (or to a native viewer, or to multiple viewers).
///
/// The WebSocket server will buffer all log data in memory so that late connecting viewers will get all the data.
/// You can limit the amount of data buffered by the WebSocket server with the `memory_limit` argument.
/// Once reached, the earliest logged data will be dropped.
/// Note that this means that timeless data may be dropped if logged early.
///
/// ## Example
///
/// ```ignore
Expand All @@ -469,7 +478,11 @@ impl RecordingStreamBuilder {
/// let _tokio_runtime_guard = tokio_runtime_handle.enter();
///
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .serve("0.0.0.0", Default::default(), Default::default(), true)?;
/// .serve("0.0.0.0",
/// Default::default(),
/// Default::default(),
/// re_sdk::MemoryLimit::from_fraction_of_total(0.25),
/// true)?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[cfg(feature = "web_viewer")]
Expand All @@ -478,11 +491,18 @@ impl RecordingStreamBuilder {
bind_ip: &str,
web_port: WebViewerServerPort,
ws_port: RerunServerPort,
server_memory_limit: re_memory::MemoryLimit,
open_browser: bool,
) -> RecordingStreamResult<RecordingStream> {
let (enabled, store_info, batcher_config) = self.into_args();
if enabled {
let sink = crate::web_viewer::new_sink(open_browser, bind_ip, web_port, ws_port)?;
let sink = crate::web_viewer::new_sink(
open_browser,
bind_ip,
web_port,
ws_port,
server_memory_limit,
)?;
RecordingStream::new(store_info, batcher_config, sink)
} else {
re_log::debug!("Rerun disabled - call to serve() ignored");
Expand Down
4 changes: 4 additions & 0 deletions crates/re_sdk/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl WebViewerSink {
bind_ip: &str,
web_port: WebViewerServerPort,
ws_port: RerunServerPort,
server_memory_limit: re_memory::MemoryLimit,
) -> Result<Self, WebViewerSinkError> {
// TODO(cmc): the sources here probably don't make much sense…
let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(
Expand All @@ -46,6 +47,7 @@ impl WebViewerSink {
re_smart_channel::ReceiveSet::new(vec![rerun_rx]),
bind_ip.to_owned(),
ws_port,
server_memory_limit,
)?;
let webviewer_server = WebViewerServerHandle::new(bind_ip, web_port)?;

Expand Down Expand Up @@ -126,11 +128,13 @@ pub fn new_sink(
bind_ip: &str,
web_port: WebViewerServerPort,
ws_port: RerunServerPort,
server_memory_limit: re_memory::MemoryLimit,
) -> Result<Box<dyn crate::sink::LogSink>, WebViewerSinkError> {
Ok(Box::new(WebViewerSink::new(
open_browser,
bind_ip,
web_port,
ws_port,
server_memory_limit,
)?))
}
3 changes: 2 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum TimeControlCommand {
/// Settings set once at startup (e.g. via command-line options) and not serialized.
#[derive(Clone)]
pub struct StartupOptions {
/// When the total process RAM reaches this limit, we GC old data.
pub memory_limit: re_memory::MemoryLimit,

pub persist_state: bool,
Expand All @@ -62,7 +63,7 @@ pub struct StartupOptions {
impl Default for StartupOptions {
fn default() -> Self {
Self {
memory_limit: re_memory::MemoryLimit::default(),
memory_limit: re_memory::MemoryLimit::from_fraction_of_total(0.75),
persist_state: true,
is_in_notebook: false,

Expand Down
2 changes: 2 additions & 0 deletions crates/re_ws_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ tls = [


[dependencies]
re_format.workspace = true
re_log.workspace = true
re_log_types = { workspace = true, features = ["serde"] }
re_memory.workspace = true
re_tracing.workspace = true

anyhow.workspace = true
Expand Down
82 changes: 75 additions & 7 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,82 @@
//! In the future thing will be changed to a protocol where the clients can query
//! for specific data based on e.g. time.
use std::{net::SocketAddr, sync::Arc};
use std::{collections::VecDeque, net::SocketAddr, sync::Arc};

use futures_util::{SinkExt, StreamExt};
use parking_lot::Mutex;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, tungstenite::Error};

use re_log_types::LogMsg;
use re_memory::MemoryLimit;
use re_smart_channel::ReceiveSet;

use crate::{server_url, RerunServerError, RerunServerPort};

struct MessageQueue {
server_memory_limit: MemoryLimit,
messages: VecDeque<Arc<[u8]>>,
}

impl MessageQueue {
pub fn new(server_memory_limit: MemoryLimit) -> Self {
Self {
server_memory_limit,
messages: Default::default(),
}
}

pub fn push(&mut self, msg: Arc<[u8]>) {
self.gc_if_using_too_much_ram();
self.messages.push_back(msg);
}

fn gc_if_using_too_much_ram(&mut self) {
re_tracing::profile_function!();

if let Some(limit) = self.server_memory_limit.limit {
let limit = limit as u64;
let bytes_used = self.messages.iter().map(|m| m.len()).sum::<usize>() as u64;

if limit < bytes_used {
re_tracing::profile_scope!("Drop messages");
re_log::info_once!(
"Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.",
re_format::format_bytes(limit as _)
);

let bytes_to_free = bytes_used - limit;

let mut bytes_dropped = 0;
let mut messages_dropped = 0;

while bytes_dropped < bytes_to_free {
if let Some(msg) = self.messages.pop_front() {
bytes_dropped += msg.len() as u64;
messages_dropped += 1;
} else {
break;
}
}

re_log::trace!(
"Dropped {} bytes in {messages_dropped} message(s)",
re_format::format_bytes(bytes_dropped as _)
);
}
}
}

pub fn to_vec(&self) -> VecDeque<Arc<[u8]>> {
re_tracing::profile_function!();
self.messages.clone()
}
}

/// Websocket host for relaying [`LogMsg`]s to a web viewer.
pub struct RerunServer {
server_memory_limit: MemoryLimit,
listener: TcpListener,
local_addr: std::net::SocketAddr,
}
Expand All @@ -30,7 +92,11 @@ impl RerunServer {
///
/// A `bind_ip` of `"0.0.0.0"` is a good default.
/// A port of 0 will let the OS choose a free port.
pub async fn new(bind_ip: String, port: RerunServerPort) -> Result<Self, RerunServerError> {
pub async fn new(
bind_ip: String,
port: RerunServerPort,
server_memory_limit: MemoryLimit,
) -> Result<Self, RerunServerError> {
let bind_addr = format!("{bind_ip}:{port}");

let listener = match TcpListener::bind(&bind_addr).await {
Expand All @@ -46,6 +112,7 @@ impl RerunServer {
};

let slf = Self {
server_memory_limit,
local_addr: listener.local_addr()?,
listener,
};
Expand All @@ -70,7 +137,7 @@ impl RerunServer {
rx: ReceiveSet<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> Result<(), RerunServerError> {
let history = Arc::new(Mutex::new(Vec::new()));
let history = Arc::new(Mutex::new(MessageQueue::new(self.server_memory_limit)));

let log_stream = to_broadcast_stream(rx, history.clone());

Expand Down Expand Up @@ -126,13 +193,14 @@ impl RerunServerHandle {
rerun_rx: ReceiveSet<LogMsg>,
bind_ip: String,
requested_port: RerunServerPort,
server_memory_limit: MemoryLimit,
) -> Result<Self, RerunServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let rt = tokio::runtime::Handle::current();

let ws_server = rt.block_on(tokio::spawn(async move {
RerunServer::new(bind_ip, requested_port).await
RerunServer::new(bind_ip, requested_port, server_memory_limit).await
}))??;

let local_addr = ws_server.local_addr;
Expand All @@ -157,7 +225,7 @@ impl RerunServerHandle {

fn to_broadcast_stream(
log_rx: ReceiveSet<LogMsg>,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
history: Arc<Mutex<MessageQueue>>,
) -> tokio::sync::broadcast::Sender<Arc<[u8]>> {
let (tx, _) = tokio::sync::broadcast::channel(1024 * 1024);
let tx1 = tx.clone();
Expand Down Expand Up @@ -189,7 +257,7 @@ async fn accept_connection(
log_stream: tokio::sync::broadcast::Sender<Arc<[u8]>>,
_peer: SocketAddr,
tcp_stream: TcpStream,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
history: Arc<Mutex<MessageQueue>>,
) {
// let span = re_log::span!(
// re_log::Level::INFO,
Expand All @@ -211,7 +279,7 @@ async fn accept_connection(
async fn handle_connection(
log_stream: tokio::sync::broadcast::Sender<Arc<[u8]>>,
tcp_stream: TcpStream,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
history: Arc<Mutex<MessageQueue>>,
) -> tungstenite::Result<()> {
let ws_stream = accept_async(tcp_stream).await?;
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
Expand Down
Loading

0 comments on commit e10c0ab

Please sign in to comment.