Skip to content

Commit

Permalink
Remove need for tokio runtime for supporting serve (#6043)
Browse files Browse the repository at this point in the history
### What

Rust users now no longer need to ensure to have tokio runtime set up
when using `serve`.

* Fixes #5907
* Direct follow-up / merges into #6042

Last PR in a series of PRs for removal of the tokio runtime:
* removes need for tokio in re_sdk_comms
*similar to the refactor in re_ws_comms only in spirit - a lot simpler:
no fancy broadcast, no other libraries involved. Just a bunch of threads
hammering on blocking sockets).
* remove remaining usages of async in this context
* removes tokio need from all documentation

-----

Dependency count `0.15.1` -> last friday `d90ed2f7e`-> `this pr`

* `cargo build -p rerun --no-default-features` 225 -> 274 -> 275
dependencies
* `cargo build -p rerun -F default` 361 -> 364 -> 364 dependencies
* `cargo build -p rerun -F web_viewer` 409 -> 412 -> 374 dependencies

Notes:
* 0.15.1 already regressed quite a bit compared back to January:
#4824
* we currently have a few temporary crates while migrating to re_query2
which weren't present in 0.15

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6043?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/6043?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!

- [PR Build Summary](https://build.rerun.io/pr/6043)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
  • Loading branch information
Wumpf authored Apr 22, 2024
1 parent f70d675 commit 2f73825
Show file tree
Hide file tree
Showing 23 changed files with 83 additions and 196 deletions.
36 changes: 0 additions & 36 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ tiny_http = { version = "0.12", default-features = false }
tinystl = { version = "0.0.3", default-features = false }
tinyvec = { version = "1.6", features = ["alloc", "rustc_1_55"] }
tobj = "4.0"
tokio = { version = "1.24", default-features = false }
toml = { version = "0.8.10", default-features = false }
tracing = { version = "0.1", default-features = false }
tungstenite = { version = "0.20", default-features = false }
Expand Down
13 changes: 0 additions & 13 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,9 +461,6 @@ impl RecordingStreamBuilder {
/// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a
/// web-based Rerun viewer via WebSockets.
///
/// This method needs to be called in a context where a Tokio runtime is already running (see
/// example below).
///
/// If the `open_browser` argument is `true`, your default browser will be opened with a
/// connected web-viewer.
///
Expand All @@ -481,16 +478,6 @@ impl RecordingStreamBuilder {
/// ## Example
///
/// ```ignore
/// // Ensure we have a running tokio runtime.
/// let mut tokio_runtime = None;
/// let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
/// handle
/// } else {
/// let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
/// tokio_runtime.get_or_insert(rt).handle().clone()
/// };
/// let _tokio_runtime_guard = tokio_runtime_handle.enter();
///
/// let rec = re_sdk::RecordingStreamBuilder::new("rerun_example_app")
/// .serve("0.0.0.0",
/// Default::default(),
Expand Down
2 changes: 0 additions & 2 deletions crates/re_sdk/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ impl crate::sink::LogSink for WebViewerSink {
/// NOTE: you can not connect one `Session` to another.
///
/// This function returns immediately.
///
/// The caller needs to ensure that there is a `tokio` runtime running.
#[must_use = "the sink must be kept around to keep the servers running"]
pub fn new_sink(
open_browser: bool,
Expand Down
8 changes: 1 addition & 7 deletions crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ all-features = true
client = ["re_log_encoding/encoder"]

## Enable the server.
server = ["rand", "tokio", "re_log_encoding/decoder"]
server = ["rand", "re_log_encoding/decoder"]


[dependencies]
Expand All @@ -46,9 +46,3 @@ rand = { workspace = true, optional = true, features = [
"std_rng",
"small_rng",
] }

tokio = { workspace = true, optional = true, features = [
"io-util",
"net",
"rt",
] }
102 changes: 57 additions & 45 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{io::ErrorKind, time::Instant};
use std::{
io::{ErrorKind, Read as _},
net::{TcpListener, TcpStream},
time::Instant,
};

use rand::{Rng as _, SeedableRng};
use tokio::net::{TcpListener, TcpStream};

use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName};
use re_smart_channel::{Receiver, Sender};
Expand All @@ -13,6 +16,9 @@ pub enum ServerError {
bind_addr: String,
err: std::io::Error,
},

#[error(transparent)]
FailedToSpawnThread(#[from] std::io::Error),
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -68,12 +74,15 @@ impl Default for ServerOptions {
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// #[tokio::main]
/// async fn main() {
/// let log_msg_rx = serve("0.0.0.0", re_sdk_comms::DEFAULT_SERVER_PORT, ServerOptions::default()).await.unwrap();
/// fn main() {
/// let log_msg_rx = serve("0.0.0.0", re_sdk_comms::DEFAULT_SERVER_PORT, ServerOptions::default()).unwrap();
/// }
/// ```
pub async fn serve(
///
/// Internally spawns a thread that listens for incoming TCP connections on the given `bind_ip` and `port`
/// and one thread per connected client.
// TODO(andreas): Reconsider if we should use `smol` tasks instead of threads both here and in re_ws_comms.
pub fn serve(
bind_ip: &str,
port: u16,
options: ServerOptions,
Expand All @@ -85,13 +94,16 @@ pub async fn serve(
);

let bind_addr = format!("{bind_ip}:{port}");
let listener =
TcpListener::bind(&bind_addr)
.await
.map_err(|err| ServerError::TcpBindError {
bind_addr: bind_addr.clone(),
err,
})?;
let listener = TcpListener::bind(&bind_addr).map_err(|err| ServerError::TcpBindError {
bind_addr: bind_addr.clone(),
err,
})?;

std::thread::Builder::new()
.name("rerun_sdk_comms: listener".to_owned())
.spawn(move || {
listen_for_new_clients(&listener, options, &tx);
})?;

if options.quiet {
re_log::debug!(
Expand All @@ -103,19 +115,24 @@ pub async fn serve(
);
}

tokio::spawn(listen_for_new_clients(listener, options, tx));

Ok(rx)
}

async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, tx: Sender<LogMsg>) {
#[allow(clippy::infinite_loop)] // TODO(emilk): some way of aborting this loop
fn listen_for_new_clients(listener: &TcpListener, options: ServerOptions, tx: &Sender<LogMsg>) {
// TODO(emilk): some way of aborting this loop
#[allow(clippy::infinite_loop)]
loop {
match listener.accept().await {
match listener.accept() {
Ok((stream, _)) => {
let addr = stream.peer_addr().ok();
let tx = tx.clone_as(re_smart_channel::SmartMessageSource::TcpClient { addr });
spawn_client(stream, tx, options, addr);

std::thread::Builder::new()
.name("rerun_sdk_comms: client".to_owned())
.spawn(move || {
spawn_client(stream, &tx, options, addr);
})
.ok();
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
Expand All @@ -126,46 +143,41 @@ async fn listen_for_new_clients(listener: TcpListener, options: ServerOptions, t

fn spawn_client(
stream: TcpStream,
tx: Sender<LogMsg>,
tx: &Sender<LogMsg>,
options: ServerOptions,
peer_addr: Option<std::net::SocketAddr>,
) {
tokio::spawn(async move {
let addr_string =
peer_addr.map_or_else(|| "(unknown ip)".to_owned(), |addr| addr.to_string());
let addr_string = peer_addr.map_or_else(|| "(unknown ip)".to_owned(), |addr| addr.to_string());

if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}

if let Err(err) = run_client(stream, &tx, options).await {
if let ConnectionError::SendError(err) = &err {
if err.kind() == ErrorKind::UnexpectedEof {
// Client gracefully severed the connection.
tx.quit(None).ok(); // best-effort at this point
return;
}
if let Err(err) = run_client(stream, tx, options) {
if let ConnectionError::SendError(err) = &err {
if err.kind() == ErrorKind::UnexpectedEof {
// Client gracefully severed the connection.
tx.quit(None).ok(); // best-effort at this point
return;
}
re_log::warn_once!("Closing connection to client at {addr_string}: {err}");
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.to_string().into();
tx.quit(Some(err)).ok(); // best-effort at this point
}
});
re_log::warn_once!("Closing connection to client at {addr_string}: {err}");
let err: Box<dyn std::error::Error + Send + Sync + 'static> = err.to_string().into();
tx.quit(Some(err)).ok(); // best-effort at this point
}
}

async fn run_client(
fn run_client(
mut stream: TcpStream,
tx: &Sender<LogMsg>,
options: ServerOptions,
) -> Result<(), ConnectionError> {
#![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274

use tokio::io::AsyncReadExt as _;

let mut client_version = [0_u8; 2];
stream.read_exact(&mut client_version).await?;
stream.read_exact(&mut client_version)?;
let client_version = u16::from_le_bytes(client_version);

match client_version.cmp(&crate::PROTOCOL_VERSION) {
Expand All @@ -190,11 +202,11 @@ async fn run_client(

loop {
let mut packet_size = [0_u8; 4];
stream.read_exact(&mut packet_size).await?;
stream.read_exact(&mut packet_size)?;
let packet_size = u32::from_le_bytes(packet_size);

packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet).await?;
stream.read_exact(&mut packet)?;

re_log::trace!("Received packet of size {packet_size}.");

Expand Down
4 changes: 0 additions & 4 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ pub enum WebViewerServerError {

#[error("Failed to create server at address {0}: {1}")]
CreateServerFailed(String, Box<dyn std::error::Error + Send + Sync + 'static>),

#[cfg(feature = "sync")]
#[error("Failed to spawn web viewer thread: {0}")]
ThreadSpawnFailed(#[from] std::io::Error),
}

// ----------------------------------------------------------------------------
Expand Down
1 change: 0 additions & 1 deletion crates/rerun-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ rerun = { workspace = true, features = [

document-features.workspace = true
mimalloc.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }


[build-dependencies]
Expand Down
5 changes: 2 additions & 3 deletions crates/rerun-cli/src/bin/rerun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ use re_memory::AccountingAllocator;
static GLOBAL: AccountingAllocator<mimalloc::MiMalloc> =
AccountingAllocator::new(mimalloc::MiMalloc);

#[tokio::main]
async fn main() -> std::process::ExitCode {
fn main() -> std::process::ExitCode {
re_log::setup_logging();

let build_info = re_build_info::build_info!();

let result = rerun::run(build_info, rerun::CallSource::Cli, std::env::args()).await;
let result = rerun::run(build_info, rerun::CallSource::Cli, std::env::args());

match result {
Ok(exit_code) => std::process::ExitCode::from(exit_code),
Expand Down
9 changes: 1 addition & 8 deletions crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ analytics = [
]

## Integration with `clap`.
clap = ["dep:clap", "dep:tokio"]
clap = ["dep:clap"]

## Support for using Rerun's data-loaders directly from the SDK.
##
Expand Down Expand Up @@ -84,7 +84,6 @@ run = [
"dep:re_log_encoding",
"dep:re_sdk_comms",
"dep:re_ws_comms",
"dep:tokio",
]

## Support for running a TCP server that listens to incoming log messages from a Rerun SDK.
Expand Down Expand Up @@ -143,12 +142,6 @@ rayon.workspace = true

# Native, optional:
clap = { workspace = true, optional = true, features = ["derive"] }
tokio = { workspace = true, optional = true, features = [
"macros",
"rt-multi-thread",
"time",
] }


[build-dependencies]
re_build_tools.workspace = true
Loading

0 comments on commit 2f73825

Please sign in to comment.