Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove need for tokio runtime for supporting serve #6043

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ tiny_http = { version = "0.12", default-features = false }
tinystl = { version = "0.0.3", default-features = false }
tinyvec = { version = "1.6", features = ["alloc", "rustc_1_55"] }
tobj = "4.0"
tokio = { version = "1.24", default-features = false }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🥳

toml = { version = "0.8.10", default-features = false }
tracing = { version = "0.1", default-features = false }
tungstenite = { version = "0.20", default-features = false }
Expand Down
13 changes: 0 additions & 13 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,6 @@ impl RecordingStreamBuilder {
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// web-based Rerun viewer via WebSockets.
///
/// This method needs to be called in a context where a Tokio runtime is already running (see
/// example below).
///
/// If the `open_browser` argument is `true`, your default browser will be opened with a
/// connected web-viewer.
///
Expand All @@ -481,16 +478,6 @@ impl RecordingStreamBuilder {
/// ## Example
///
/// ```ignore
/// // Ensure we have a running tokio runtime.
/// let mut tokio_runtime = None;
/// let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
/// handle
/// } else {
/// let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
/// tokio_runtime.get_or_insert(rt).handle().clone()
/// };
/// let _tokio_runtime_guard = tokio_runtime_handle.enter();
///
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .serve("0.0.0.0",
/// Default::default(),
Expand Down
2 changes: 0 additions & 2 deletions crates/re_sdk/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ impl crate::sink::LogSink for WebViewerSink {
/// NOTE: you can not connect one `Session` to another.
///
/// This function returns immediately.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
#[must_use = "the sink must be kept around to keep the servers running"]
pub fn new_sink(
open_browser: bool,
Expand Down
8 changes: 1 addition & 7 deletions crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ all-features = true
client = ["re_log_encoding/encoder"]

## Enable the server.
server = ["rand", "tokio", "re_log_encoding/decoder"]
server = ["rand", "re_log_encoding/decoder"]


[dependencies]
Expand All @@ -46,9 +46,3 @@ rand = { workspace = true, optional = true, features = [
"std_rng",
"small_rng",
] }

tokio = { workspace = true, optional = true, features = [
"io-util",
"net",
"rt",
] }
102 changes: 57 additions & 45 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{io::ErrorKind, time::Instant};
use std::{
io::{ErrorKind, Read as _},
net::{TcpListener, TcpStream},
time::Instant,
};

use rand::{Rng as _, SeedableRng};
use tokio::net::{TcpListener, TcpStream};

use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName};
use re_smart_channel::{Receiver, Sender};
Expand All @@ -13,6 +16,9 @@ pub enum ServerError {
bind_addr: String,
err: std::io::Error,
},

#[error(transparent)]
FailedToSpawnThread(#[from] std::io::Error),
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -68,12 +74,15 @@ impl Default for ServerOptions {
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// #[tokio::main]
/// async fn main() {
/// let log_msg_rx = serve("0.0.0.0", re_sdk_comms::DEFAULT_SERVER_PORT, ServerOptions::default()).await.unwrap();
/// fn main() {
/// let log_msg_rx = serve("0.0.0.0", re_sdk_comms::DEFAULT_SERVER_PORT, ServerOptions::default()).unwrap();
/// }
/// ```
pub async fn serve(
///
/// Internally spawns a thread that listens for incoming TCP connections on the given `bind_ip` and `port`
/// and one thread per connected client.
// TODO(andreas): Reconsider if we should use `smol` tasks instead of threads both here and in re_ws_comms.
pub fn serve(
bind_ip: &str,
port: u16,
options: ServerOptions,
Expand All @@ -85,13 +94,16 @@ pub async fn serve(
);

let bind_addr = format!("{bind_ip}:{port}");
let listener =
TcpListener::bind(&bind_addr)
.await
.map_err(|err| ServerError::TcpBindError {
bind_addr: bind_addr.clone(),
err,
})?;
let listener = TcpListener::bind(&bind_addr).map_err(|err| ServerError::TcpBindError {
bind_addr: bind_addr.clone(),
err,
})?;

std::thread::Builder::new()
.name("rerun_sdk_comms: listener".to_owned())
.spawn(move || {
listen_for_new_clients(&listener, options, &tx);
})?;

if options.quiet {
re_log::debug!(
Expand All @@ -103,19 +115,24 @@ pub async fn serve(
);
}

tokio::spawn(listen_for_new_clients(listener, options, tx));

Ok(rx)
}

async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, tx: Sender<LogMsg>) {
#[allow(clippy::infinite_loop)] // TODO(emilk): some way of aborting this loop
fn listen_for_new_clients(listener: &TcpListener, options: ServerOptions, tx: &Sender<LogMsg>) {
// TODO(emilk): some way of aborting this loop
#[allow(clippy::infinite_loop)]
loop {
match listener.accept().await {
match listener.accept() {
Ok((stream, _)) => {
let addr = stream.peer_addr().ok();
let tx = tx.clone_as(re_smart_channel::SmartMessageSource::TcpClient { addr });
spawn_client(stream, tx, options, addr);

std::thread::Builder::new()
.name("rerun_sdk_comms: client".to_owned())
.spawn(move || {
spawn_client(stream, &tx, options, addr);
})
.ok();
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
Expand All @@ -126,46 +143,41 @@ async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, t

fn spawn_client(
stream: TcpStream,
tx: Sender<LogMsg>,
tx: &Sender<LogMsg>,
options: ServerOptions,
peer_addr: Option<std::net::SocketAddr>,
) {
tokio::spawn(async move {
let addr_string =
peer_addr.map_or_else(|| "(unknown ip)".to_owned(), |addr| addr.to_string());
let addr_string = peer_addr.map_or_else(|| "(unknown ip)".to_owned(), |addr| addr.to_string());

if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}

if let Err(err) = run_client(stream, &tx, options).await {
if let ConnectionError::SendError(err) = &err {
if err.kind() == ErrorKind::UnexpectedEof {
// Client gracefully severed the connection.
tx.quit(None).ok(); // best-effort at this point
return;
}
if let Err(err) = run_client(stream, tx, options) {
if let ConnectionError::SendError(err) = &err {
if err.kind() == ErrorKind::UnexpectedEof {
// Client gracefully severed the connection.
tx.quit(None).ok(); // best-effort at this point
return;
}
re_log::warn_once!("Closing connection to client at {addr_string}: {err}");
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.to_string().into();
tx.quit(Some(err)).ok(); // best-effort at this point
}
});
re_log::warn_once!("Closing connection to client at {addr_string}: {err}");
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.to_string().into();
tx.quit(Some(err)).ok(); // best-effort at this point
}
}

async fn run_client(
fn run_client(
mut stream: TcpStream,
tx: &Sender<LogMsg>,
options: ServerOptions,
) -> Result<(), ConnectionError> {
#![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274

use tokio::io::AsyncReadExt as _;

let mut client_version = [0_u8; 2];
stream.read_exact(&mut client_version).await?;
stream.read_exact(&mut client_version)?;
let client_version = u16::from_le_bytes(client_version);

match client_version.cmp(&crate::PROTOCOL_VERSION) {
Expand All @@ -190,11 +202,11 @@ async fn run_client(

loop {
let mut packet_size = [0_u8; 4];
stream.read_exact(&mut packet_size).await?;
stream.read_exact(&mut packet_size)?;
let packet_size = u32::from_le_bytes(packet_size);

packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet).await?;
stream.read_exact(&mut packet)?;

re_log::trace!("Received packet of size {packet_size}.");

Expand Down
4 changes: 0 additions & 4 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ pub enum WebViewerServerError {

#[error("Failed to create server at address {0}: {1}")]
CreateServerFailed(String, Box<dyn std::error::Error + Send + Sync + 'static>),

#[cfg(feature = "sync")]
#[error("Failed to spawn web viewer thread: {0}")]
ThreadSpawnFailed(#[from] std::io::Error),
}

// ----------------------------------------------------------------------------
Expand Down
1 change: 0 additions & 1 deletion crates/rerun-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ rerun = { workspace = true, features = [

document-features.workspace = true
mimalloc.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }


[build-dependencies]
Expand Down
5 changes: 2 additions & 3 deletions crates/rerun-cli/src/bin/rerun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ use re_memory::AccountingAllocator;
static GLOBAL: AccountingAllocator<mimalloc::MiMalloc> =
AccountingAllocator::new(mimalloc::MiMalloc);

#[tokio::main]
async fn main() -> std::process::ExitCode {
fn main() -> std::process::ExitCode {
re_log::setup_logging();

let build_info = re_build_info::build_info!();

let result = rerun::run(build_info, rerun::CallSource::Cli, std::env::args()).await;
let result = rerun::run(build_info, rerun::CallSource::Cli, std::env::args());

match result {
Ok(exit_code) => std::process::ExitCode::from(exit_code),
Expand Down
9 changes: 1 addition & 8 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ analytics = [
]

## Integration with `clap`.
clap = ["dep:clap", "dep:tokio"]
clap = ["dep:clap"]

## Support for using Rerun's data-loaders directly from the SDK.
##
Expand Down Expand Up @@ -84,7 +84,6 @@ run = [
"dep:re_log_encoding",
"dep:re_sdk_comms",
"dep:re_ws_comms",
"dep:tokio",
]

## Support for running a TCP server that listens to incoming log messages from a Rerun SDK.
Expand Down Expand Up @@ -143,12 +142,6 @@ rayon.workspace = true

# Native, optional:
clap = { workspace = true, optional = true, features = ["derive"] }
tokio = { workspace = true, optional = true, features = [
"macros",
"rt-multi-thread",
"time",
] }


[build-dependencies]
re_build_tools.workspace = true
Loading
Loading