Skip to content

Commit

Permalink
Revert "Handle ctrl+c to gracefully shutdown the server(s) (#1613)" (#…
Browse files Browse the repository at this point in the history
…1632)

This reverts commit d666c94.
  • Loading branch information
emilk authored Mar 21, 2023
1 parent 21e1a8a commit 93ab88b
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 219 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ arrow2 = "0.16"
arrow2_convert = "0.4.2"
clap = "4.0"
comfy-table = { version = "6.1", default-features = false }
ctrlc = { version = "3.0", features = ["termination"] }
ecolor = "0.21.0"
eframe = { version = "0.21.3", default-features = false }
egui = "0.21.0"
Expand Down
1 change: 0 additions & 1 deletion crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ bincode = "1.3"
crossbeam = "0.8"
document-features = "0.2"
rand = { version = "0.8.5", features = ["small_rng"] }
tokio.workspace = true
123 changes: 56 additions & 67 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use rand::{Rng as _, SeedableRng};

use re_log_types::{LogMsg, TimePoint, TimeType, TimelineName};
use re_smart_channel::{Receiver, Sender};
use tokio::net::{TcpListener, TcpStream};

#[derive(Clone, Copy, Debug, PartialEq)]
pub struct ServerOptions {
Expand All @@ -28,18 +27,37 @@ impl Default for ServerOptions {
}
}

async fn listen_for_new_clients(
port: u16,
options: ServerOptions,
tx: Sender<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) {
/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// let log_msg_rx = serve(80, ServerOptions::default())?;
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn serve(port: u16, options: ServerOptions) -> anyhow::Result<Receiver<LogMsg>> {
let bind_addr = format!("0.0.0.0:{port}");

let listener = TcpListener::bind(&bind_addr)
.await
.with_context(|| format!("Failed to bind TCP address {bind_addr:?}"))
.unwrap();
let listener = std::net::TcpListener::bind(&bind_addr)
.with_context(|| format!("Failed to bind TCP address {bind_addr:?} for our WS server."))?;

let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

std::thread::Builder::new()
.name("sdk-server".into())
.spawn(move || {
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
})
.expect("Failed to spawn thread");

if options.quiet {
re_log::debug!(
Expand All @@ -51,72 +69,43 @@ async fn listen_for_new_clients(
);
}

loop {
let incoming = tokio::select! {
res = listener.accept() => res,
_ = shutdown_rx.recv() => {
return;
}
};
match incoming {
Ok((stream, _)) => {
let tx = tx.clone();
spawn_client(stream, tx, options);
}
Err(err) => {
re_log::warn!("Failed to accept incoming SDK client: {err}");
}
}
}
}

/// Listen to multiple SDK:s connecting to us over TCP.
///
/// ``` no_run
/// # use re_sdk_comms::{serve, ServerOptions};
/// let (sender, receiver) = tokio::sync::broadcast::channel(1);
/// let log_msg_rx = serve(80, ServerOptions::default(), receiver)?;
/// # Ok::<(), anyhow::Error>(())
/// ```
pub fn serve(
port: u16,
options: ServerOptions,
shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<Receiver<LogMsg>> {
let (tx, rx) = re_smart_channel::smart_channel(re_smart_channel::Source::TcpServer { port });

tokio::spawn(listen_for_new_clients(port, options, tx, shutdown_rx));

Ok(rx)
}

fn spawn_client(stream: TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
tokio::spawn(async move {
let addr_string = stream
.peer_addr()
.map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string());
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}
if let Err(err) = run_client(stream, &tx, options).await {
re_log::warn!("Closing connection to client: {err}");
}
});
fn spawn_client(stream: std::net::TcpStream, tx: Sender<LogMsg>, options: ServerOptions) {
std::thread::Builder::new()
.name(format!(
"sdk-server-client-handler-{:?}",
stream.peer_addr()
))
.spawn(move || {
let addr_string = stream
.peer_addr()
.map_or_else(|_| "(unknown ip)".to_owned(), |addr| addr.to_string());
if options.quiet {
re_log::debug!("New SDK client connected: {addr_string}");
} else {
re_log::info!("New SDK client connected: {addr_string}");
}

if let Err(err) = run_client(stream, &tx, options) {
re_log::warn!("Closing connection to client: {err}");
}
})
.expect("Failed to spawn thread");
}

async fn run_client(
mut stream: TcpStream,
fn run_client(
mut stream: std::net::TcpStream,
tx: &Sender<LogMsg>,
options: ServerOptions,
) -> anyhow::Result<()> {
#![allow(clippy::read_zero_byte_vec)] // false positive: https://github.com/rust-lang/rust-clippy/issues/9274

use tokio::io::AsyncReadExt as _;
use std::io::Read as _;

let mut client_version = [0_u8; 2];
stream.read_exact(&mut client_version).await?;
stream.read_exact(&mut client_version)?;
let client_version = u16::from_le_bytes(client_version);

match client_version.cmp(&crate::PROTOCOL_VERSION) {
Expand All @@ -143,11 +132,11 @@ async fn run_client(

loop {
let mut packet_size = [0_u8; 4];
stream.read_exact(&mut packet_size).await?;
stream.read_exact(&mut packet_size)?;
let packet_size = u32::from_le_bytes(packet_size);

packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet).await?;
stream.read_exact(&mut packet)?;

re_log::trace!("Received log message of size {packet_size}.");

Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ wgpu.workspace = true
arboard = { version = "3.2", default-features = false, features = [
"image-data",
] }
ctrlc.workspace = true
ctrlc = { version = "3.0", features = ["termination"] }
puffin_http = "0.11"
puffin.workspace = true

Expand Down
1 change: 0 additions & 1 deletion crates/re_web_viewer_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ analytics = ["dep:re_analytics"]
re_log.workspace = true

anyhow.workspace = true
ctrlc.workspace = true
document-features = "0.2"
futures-util = "0.3"
hyper = { version = "0.14", features = ["full"] }
Expand Down
11 changes: 2 additions & 9 deletions crates/re_web_viewer_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,8 @@ impl WebViewerServer {
Self { server }
}

pub async fn serve(
self,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
self.server
.with_graceful_shutdown(async {
shutdown_rx.recv().await.ok();
})
.await?;
pub async fn serve(self) -> anyhow::Result<()> {
self.server.await?;
Ok(())
}
}
11 changes: 1 addition & 10 deletions crates/re_web_viewer_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,8 @@ async fn main() {
re_log::setup_native_logging();
let port = 9090;
eprintln!("Hosting web-viewer on http://127.0.0.1:{port}");

// Shutdown server via Ctrl+C
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
ctrlc::set_handler(move || {
re_log::debug!("Ctrl-C detected - Closing web server.");
shutdown_tx.send(()).unwrap();
})
.expect("Error setting Ctrl-C handler");

re_web_viewer_server::WebViewerServer::new(port)
.serve(shutdown_rx)
.serve()
.await
.unwrap();
}
19 changes: 5 additions & 14 deletions crates/re_ws_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,15 @@ impl Server {
Ok(Self { listener })
}

/// Accept new connections until we get a message on `shutdown_rx`
pub async fn listen(
self,
rx: Receiver<LogMsg>,
mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
) -> anyhow::Result<()> {
/// Accept new connections forever
pub async fn listen(self, rx: Receiver<LogMsg>) -> anyhow::Result<()> {
use anyhow::Context as _;

let history = Arc::new(Mutex::new(Vec::new()));

let log_stream = to_broadcast_stream(rx, history.clone());

loop {
let (tcp_stream, _) = tokio::select! {
res = self.listener.accept() => res?,
_ = shutdown_rx.recv() => {
return Ok(());
}
};

while let Ok((tcp_stream, _)) = self.listener.accept().await {
let peer = tcp_stream
.peer_addr()
.context("connected streams should have a peer address")?;
Expand All @@ -70,6 +59,8 @@ impl Server {
history.clone(),
));
}

Ok(())
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/rerun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ backtrace = "0.3"
clap = { workspace = true, features = ["derive"] }
mimalloc.workspace = true
puffin_http = "0.11"
ctrlc.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

# Native unix dependencies:
Expand Down
21 changes: 2 additions & 19 deletions crates/rerun/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,6 @@ impl RerunArgs {
default_enabled: bool,
run: impl FnOnce(Session) + Send + 'static,
) -> anyhow::Result<()> {
// Ensure we have a running tokio runtime.
#[allow(unused_assignments)]
let mut tokio_runtime = None;
let tokio_runtime_handle = if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle
} else {
tokio_runtime =
Some(tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"));
tokio_runtime.as_ref().unwrap().handle().clone()
};
let _tokio_runtime_guard = tokio_runtime_handle.enter();

let (rerun_enabled, recording_info) = crate::SessionBuilder::new(application_id)
.default_enabled(default_enabled)
.finalize();
Expand Down Expand Up @@ -114,17 +102,12 @@ impl RerunArgs {
};

let session = Session::new(recording_info, sink);
let _sink = session.sink().clone(); // Keep sink (and potential associated servers) alive until the end of this function scope.
run(session);

#[cfg(feature = "web_viewer")]
if matches!(self.to_behavior(), Ok(RerunBehavior::Serve)) {
use anyhow::Context as _;

let mut shutdown_rx = crate::run::setup_ctrl_c_handler();
return tokio_runtime_handle
.block_on(async { shutdown_rx.recv().await })
.context("Failed to wait for shutdown signal.");
eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C");
std::thread::sleep(std::time::Duration::from_secs(1_000_000_000));
}

Ok(())
Expand Down
Loading

1 comment on commit 93ab88b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 93ab88b Previous: 21e1a8a Ratio
datastore/insert/batch/rects/insert 561578 ns/iter (± 2597) 562191 ns/iter (± 5921) 1.00
datastore/latest_at/batch/rects/query 1835 ns/iter (± 7) 1847 ns/iter (± 11) 0.99
datastore/latest_at/missing_components/primary 287 ns/iter (± 0) 286 ns/iter (± 2) 1.00
datastore/latest_at/missing_components/secondaries 435 ns/iter (± 2) 429 ns/iter (± 6) 1.01
datastore/range/batch/rects/query 149861 ns/iter (± 693) 149548 ns/iter (± 1792) 1.00
mono_points_arrow/generate_message_bundles 47879621 ns/iter (± 1886339) 45990407 ns/iter (± 1489722) 1.04
mono_points_arrow/generate_messages 137169363 ns/iter (± 1461934) 137495539 ns/iter (± 1563706) 1.00
mono_points_arrow/encode_log_msg 167553474 ns/iter (± 724392) 164653203 ns/iter (± 1327437) 1.02
mono_points_arrow/encode_total 353938250 ns/iter (± 1999604) 353452942 ns/iter (± 2671652) 1.00
mono_points_arrow/decode_log_msg 186440540 ns/iter (± 1486591) 183691894 ns/iter (± 2402906) 1.01
mono_points_arrow/decode_message_bundles 72483886 ns/iter (± 1327749) 72330282 ns/iter (± 1488929) 1.00
mono_points_arrow/decode_total 253819470 ns/iter (± 2373640) 251881583 ns/iter (± 2667690) 1.01
batch_points_arrow/generate_message_bundles 325717 ns/iter (± 1740) 323779 ns/iter (± 4054) 1.01
batch_points_arrow/generate_messages 6446 ns/iter (± 36) 6298 ns/iter (± 77) 1.02
batch_points_arrow/encode_log_msg 350843 ns/iter (± 1606) 356368 ns/iter (± 2883) 0.98
batch_points_arrow/encode_total 699548 ns/iter (± 3266) 699632 ns/iter (± 10860) 1.00
batch_points_arrow/decode_log_msg 350701 ns/iter (± 1294) 348230 ns/iter (± 2636) 1.01
batch_points_arrow/decode_message_bundles 2085 ns/iter (± 16) 1996 ns/iter (± 29) 1.04
batch_points_arrow/decode_total 355695 ns/iter (± 1361) 354068 ns/iter (± 2608) 1.00
arrow_mono_points/insert 6878628005 ns/iter (± 16552789) 7027822425 ns/iter (± 16502592) 0.98
arrow_mono_points/query 1782943 ns/iter (± 13977) 1739671 ns/iter (± 14143) 1.02
arrow_batch_points/insert 2632437 ns/iter (± 14223) 2667029 ns/iter (± 9265) 0.99
arrow_batch_points/query 16046 ns/iter (± 203) 16187 ns/iter (± 65) 0.99
arrow_batch_vecs/insert 42444 ns/iter (± 236) 42456 ns/iter (± 100) 1.00
arrow_batch_vecs/query 387871 ns/iter (± 3203) 389272 ns/iter (± 1095) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.