Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit server memory #4636

Merged
merged 10 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 52 additions & 21 deletions crates/re_memory/src/memory_limit.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
/// Represents a limit in how much RAM to use for the entire process.
///
/// Different systems can chose to heed the memory limit in different ways,
/// e.g. by dropping old data when it is exceeded.
///
/// It is recommended that they log using [`re_log::info_once`] when they
/// drop data because a memory limit is reached.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct MemoryLimit {
/// Limit in bytes.
///
Expand All @@ -9,39 +16,63 @@ pub struct MemoryLimit {
}

emilk marked this conversation as resolved.
Show resolved Hide resolved
impl MemoryLimit {
/// No limit.
pub const UNLIMITED: Self = Self { limit: None };

/// Set the limit to some number of bytes.
pub fn from_bytes(max_bytes: u64) -> Self {
Self {
limit: Some(max_bytes as _),
}
}

/// Set the limit to some fraction (0-1) of the total available RAM.
pub fn from_fraction_of_total(fraction: f32) -> Self {
let total_memory = crate::total_ram_in_bytes();
if total_memory == 0 {
re_log::info!("Couldn't determine total available memory. Setting no memory limit.");
Self { limit: None }
} else {
let limit = (fraction as f64 * total_memory as f64).round();

re_log::debug!(
"Setting memory limit to {}, which is {}% of total available memory ({}).",
re_format::format_bytes(limit),
100.0 * fraction,
re_format::format_bytes(total_memory as _),
);

Self {
limit: Some(limit as _),
}
}
}

/// The limit can either be absolute (e.g. "16GB") or relative (e.g. "50%").
pub fn parse(limit: &str) -> Result<Self, String> {
if let Some(percentage) = limit.strip_suffix('%') {
let percentage = percentage
.parse::<f32>()
.map_err(|_err| format!("expected e.g. '50%', got {limit:?}"))?;

let total_memory = crate::total_ram_in_bytes();
if total_memory == 0 {
re_log::info!(
"Couldn't determine total available memory. Setting no memory limit."
);
Ok(Self { limit: None })
} else {
let limit = (total_memory as f64 * (percentage as f64 / 100.0)).round();

re_log::debug!(
"Setting memory limit to {}, which is {percentage}% of total available memory ({}).",
re_format::format_bytes(limit),
re_format::format_bytes(total_memory as _),
);

Ok(Self {
limit: Some(limit as _),
})
}
let fraction = percentage / 100.0;
Ok(Self::from_fraction_of_total(fraction))
} else {
re_format::parse_bytes(limit)
.map(|limit| Self { limit: Some(limit) })
.ok_or_else(|| format!("expected e.g. '16GB', got {limit:?}"))
}
}

#[inline]
pub fn is_limited(&self) -> bool {
self.limit.is_some()
}

#[inline]
pub fn is_unlimited(&self) -> bool {
self.limit.is_none()
}

/// Returns how large fraction of memory we should free to go down to the exact limit.
pub fn is_exceeded_by(&self, mem_use: &crate::MemoryUse) -> Option<f32> {
let limit = self.limit?;
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ re_build_info.workspace = true
re_log_encoding = { workspace = true, features = ["encoder"] }
re_log_types.workspace = true
re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }
re_types_core.workspace = true

Expand Down
2 changes: 2 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub use re_log_types::{
entity_path, ApplicationId, EntityPath, EntityPathPart, StoreId, StoreKind,
};

pub use re_memory::MemoryLimit;

pub use global::cleanup_if_forked_child;

#[cfg(not(target_arch = "wasm32"))]
Expand Down
24 changes: 22 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ impl RecordingStreamBuilder {
///
/// If not, you can connect to this server using the `rerun` binary (`cargo install rerun-cli`).
///
/// ## Details
/// This method will spawn two servers: one HTTPS server serving the Rerun Web Viewer `.html` and `.wasm` files,
/// and then one WebSocket server that streams the log data to the web viewer (or to a native viewer, or to multiple viewers).
///
/// The WebSocket server will buffer all log data in memory so that late connecting viewers will get all the data.
/// You can limit the amount of data buffered by the WebSocket server with the `memory_limit` argument.
emilk marked this conversation as resolved.
Show resolved Hide resolved
/// Once reached, the earliest logged data will be dropped.
/// Note that this means that timeless data may be dropped if logged early.
///
/// ## Example
///
/// ```ignore
Expand All @@ -469,7 +478,11 @@ impl RecordingStreamBuilder {
/// let _tokio_runtime_guard = tokio_runtime_handle.enter();
///
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .serve("0.0.0.0", Default::default(), Default::default(), true)?;
/// .serve("0.0.0.0",
/// Default::default(),
/// Default::default(),
/// re_sdk::MemoryLimit::from_fraction_of_total(0.25),
/// true)?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[cfg(feature = "web_viewer")]
Expand All @@ -478,11 +491,18 @@ impl RecordingStreamBuilder {
bind_ip: &str,
web_port: WebViewerServerPort,
ws_port: RerunServerPort,
server_memory_limit: re_memory::MemoryLimit,
open_browser: bool,
) -> RecordingStreamResult<RecordingStream> {
let (enabled, store_info, batcher_config) = self.into_args();
if enabled {
let sink = crate::web_viewer::new_sink(open_browser, bind_ip, web_port, ws_port)?;
let sink = crate::web_viewer::new_sink(
open_browser,
bind_ip,
web_port,
ws_port,
server_memory_limit,
)?;
RecordingStream::new(store_info, batcher_config, sink)
} else {
re_log::debug!("Rerun disabled - call to serve() ignored");
Expand Down
4 changes: 4 additions & 0 deletions crates/re_sdk/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl WebViewerSink {
bind_ip: &str,
web_port: WebViewerServerPort,
ws_port: RerunServerPort,
server_memory_limit: re_memory::MemoryLimit,
) -> Result<Self, WebViewerSinkError> {
// TODO(cmc): the sources here probably don't make much sense…
let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(
Expand All @@ -46,6 +47,7 @@ impl WebViewerSink {
re_smart_channel::ReceiveSet::new(vec![rerun_rx]),
bind_ip.to_owned(),
ws_port,
server_memory_limit,
)?;
let webviewer_server = WebViewerServerHandle::new(bind_ip, web_port)?;

Expand Down Expand Up @@ -126,11 +128,13 @@ pub fn new_sink(
bind_ip: &str,
web_port: WebViewerServerPort,
ws_port: RerunServerPort,
server_memory_limit: re_memory::MemoryLimit,
) -> Result<Box<dyn crate::sink::LogSink>, WebViewerSinkError> {
Ok(Box::new(WebViewerSink::new(
open_browser,
bind_ip,
web_port,
ws_port,
server_memory_limit,
)?))
}
3 changes: 2 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum TimeControlCommand {
/// Settings set once at startup (e.g. via command-line options) and not serialized.
#[derive(Clone)]
pub struct StartupOptions {
/// When the total process RAM reaches this limit, we GC old data.
pub memory_limit: re_memory::MemoryLimit,

pub persist_state: bool,
Expand All @@ -62,7 +63,7 @@ pub struct StartupOptions {
impl Default for StartupOptions {
fn default() -> Self {
Self {
memory_limit: re_memory::MemoryLimit::default(),
memory_limit: re_memory::MemoryLimit::from_fraction_of_total(0.75),
persist_state: true,
is_in_notebook: false,

Expand Down
2 changes: 2 additions & 0 deletions crates/re_ws_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ tls = [


[dependencies]
re_format.workspace = true
re_log.workspace = true
re_log_types = { workspace = true, features = ["serde"] }
re_memory.workspace = true
re_tracing.workspace = true

anyhow.workspace = true
Expand Down
82 changes: 75 additions & 7 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,82 @@
//! In the future thing will be changed to a protocol where the clients can query
//! for specific data based on e.g. time.

use std::{net::SocketAddr, sync::Arc};
use std::{collections::VecDeque, net::SocketAddr, sync::Arc};

use futures_util::{SinkExt, StreamExt};
use parking_lot::Mutex;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, tungstenite::Error};

use re_log_types::LogMsg;
use re_memory::MemoryLimit;
use re_smart_channel::ReceiveSet;

use crate::{server_url, RerunServerError, RerunServerPort};

struct MessageQueue {
server_memory_limit: MemoryLimit,
messages: VecDeque<Arc<[u8]>>,
}

impl MessageQueue {
pub fn new(server_memory_limit: MemoryLimit) -> Self {
Self {
server_memory_limit,
messages: Default::default(),
}
}

pub fn push(&mut self, msg: Arc<[u8]>) {
self.gc_if_using_too_much_ram();
self.messages.push_back(msg);
}

fn gc_if_using_too_much_ram(&mut self) {
re_tracing::profile_function!();

if let Some(limit) = self.server_memory_limit.limit {
emilk marked this conversation as resolved.
Show resolved Hide resolved
let limit = limit as u64;
let bytes_used = self.messages.iter().map(|m| m.len()).sum::<usize>() as u64;
emilk marked this conversation as resolved.
Show resolved Hide resolved

if limit < bytes_used {
re_tracing::profile_scope!("Drop messages");
re_log::info_once!(
"Memory limit ({}) exceeded. Dropping old log messages from the server. Clients connecting after this will not see the full history.",
re_format::format_bytes(limit as _)
);

let bytes_to_free = bytes_used - limit;

let mut bytes_dropped = 0;
let mut messages_dropped = 0;

while bytes_dropped < bytes_to_free {
if let Some(msg) = self.messages.pop_front() {
bytes_dropped += msg.len() as u64;
messages_dropped += 1;
} else {
break;
}
}

re_log::trace!(
"Dropped {} bytes in {messages_dropped} message(s)",
re_format::format_bytes(bytes_dropped as _)
);
}
}
}

pub fn to_vec(&self) -> VecDeque<Arc<[u8]>> {
emilk marked this conversation as resolved.
Show resolved Hide resolved
re_tracing::profile_function!();
self.messages.clone()
}
}

/// Websocket host for relaying [`LogMsg`]s to a web viewer.
pub struct RerunServer {
server_memory_limit: MemoryLimit,
listener: TcpListener,
local_addr: std::net::SocketAddr,
}
Expand All @@ -30,7 +92,11 @@ impl RerunServer {
///
/// A `bind_ip` of `"0.0.0.0"` is a good default.
/// A port of 0 will let the OS choose a free port.
pub async fn new(bind_ip: String, port: RerunServerPort) -> Result<Self, RerunServerError> {
pub async fn new(
bind_ip: String,
port: RerunServerPort,
server_memory_limit: MemoryLimit,
) -> Result<Self, RerunServerError> {
let bind_addr = format!("{bind_ip}:{port}");

let listener = match TcpListener::bind(&bind_addr).await {
Expand All @@ -46,6 +112,7 @@ impl RerunServer {
};

let slf = Self {
server_memory_limit,
local_addr: listener.local_addr()?,
listener,
};
Expand All @@ -70,7 +137,7 @@ impl RerunServer {
rx: ReceiveSet<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> Result<(), RerunServerError> {
let history = Arc::new(Mutex::new(Vec::new()));
let history = Arc::new(Mutex::new(MessageQueue::new(self.server_memory_limit)));

let log_stream = to_broadcast_stream(rx, history.clone());

Expand Down Expand Up @@ -126,13 +193,14 @@ impl RerunServerHandle {
rerun_rx: ReceiveSet<LogMsg>,
bind_ip: String,
requested_port: RerunServerPort,
server_memory_limit: MemoryLimit,
) -> Result<Self, RerunServerError> {
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);

let rt = tokio::runtime::Handle::current();

let ws_server = rt.block_on(tokio::spawn(async move {
RerunServer::new(bind_ip, requested_port).await
RerunServer::new(bind_ip, requested_port, server_memory_limit).await
}))??;

let local_addr = ws_server.local_addr;
Expand All @@ -157,7 +225,7 @@ impl RerunServerHandle {

fn to_broadcast_stream(
log_rx: ReceiveSet<LogMsg>,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
history: Arc<Mutex<MessageQueue>>,
) -> tokio::sync::broadcast::Sender<Arc<[u8]>> {
let (tx, _) = tokio::sync::broadcast::channel(1024 * 1024);
let tx1 = tx.clone();
Expand Down Expand Up @@ -189,7 +257,7 @@ async fn accept_connection(
log_stream: tokio::sync::broadcast::Sender<Arc<[u8]>>,
_peer: SocketAddr,
tcp_stream: TcpStream,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
history: Arc<Mutex<MessageQueue>>,
) {
// let span = re_log::span!(
// re_log::Level::INFO,
Expand All @@ -211,7 +279,7 @@ async fn accept_connection(
async fn handle_connection(
log_stream: tokio::sync::broadcast::Sender<Arc<[u8]>>,
tcp_stream: TcpStream,
history: Arc<Mutex<Vec<Arc<[u8]>>>>,
history: Arc<Mutex<MessageQueue>>,
) -> tungstenite::Result<()> {
let ws_stream = accept_async(tcp_stream).await?;
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
Expand Down
Loading
Loading