Skip to content

Commit

Permalink
fix: fix span leakage (near#7172)
Browse files Browse the repository at this point in the history
So what hapens is this: broadcast_validated_edges_trigger is trigger,
which means that's a callback which gets executing on actor's event loop
from time to time.

In this trigger, we create a span. We also execute an async operation
(sendign AddVerifiedEdges message). We want the spans to connect, so we
use parent: &span in the callback.

The problem here though is that, because we move the original entered
span into the closure, the span actually remains active even after
broadcast_validated_edges_trigger function returns! We have a span leak
of sorts, and whatever else ends up being executed on the actor loop
(inlcuding subsequent calls to the trigger!) would have this run-away
span on the stack. So we get a very deep (and malformed) stack of the
spans. And then eventually it stack overflows.

This is essentially the same problem as the one in
https://onesignal.com/blog/solving-memory-leaks-in-rust/.

We fix this by using the `Instrument` trait for the problematic future,
and the `let run_later_span =` pattern for `run_later` usages.

We *could* introduce a second helper for `run_later`, but I'd rather
avoid that here -- it seems that the whole `run_later` infra can be
replaced with tracing, but I'd rather not do that in this PR.
  • Loading branch information
matklad authored Jul 11, 2022
1 parent 02b8338 commit 771bcff
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt;
use tracing::{debug, error, info, trace, warn, Span};
use tracing::{debug, error, info, trace, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// How often to request peers from active peers.
Expand Down Expand Up @@ -475,7 +475,7 @@ impl PeerManagerActor {
ctx: &mut Context<Self>,
interval: time::Duration,
) {
let span =
let _span =
tracing::trace_span!(target: "network", "broadcast_validated_edges_trigger").entered();
let start = self.clock.now();
let mut new_edges = Vec::new();
Expand Down Expand Up @@ -519,11 +519,11 @@ impl PeerManagerActor {

self.routing_table_addr
.send(routing::actor::Message::AddVerifiedEdges { edges: new_edges })
.in_current_span()
.into_actor(self)
.map(move |response, act, _ctx| {
let _span = tracing::trace_span!(
target: "network",
parent: &span,
"broadcast_validated_edges_trigger_response")
.entered();

Expand Down Expand Up @@ -572,7 +572,7 @@ impl PeerManagerActor {
throttle_controller: ThrottleController,
ctx: &mut Context<Self>,
) {
let span = tracing::trace_span!(target: "network", "register_peer").entered();
let _span = tracing::trace_span!(target: "network", "register_peer").entered();
debug!(target: "network", ?full_peer_info, "Consolidated connection");

if self.outgoing_peers.contains(&full_peer_info.peer_info.id) {
Expand Down Expand Up @@ -612,13 +612,12 @@ impl PeerManagerActor {
self.add_verified_edges_to_routing_table(vec![new_edge.clone()]);

let network_metrics = self.network_metrics.clone();
let run_later_span = tracing::trace_span!(target: "network", "RequestRoutingTableResponse");
near_performance_metrics::actix::run_later(
ctx,
WAIT_FOR_SYNC_DELAY.try_into().unwrap(),
move |act, ctx| {
let _span =
tracing::trace_span!(target: "network", parent: &span, "RequestRoutingTableResponse")
.entered();
let _guard = run_later_span.enter();
let known_edges = act.network_graph.read().edges().values().cloned().collect();
act.send_sync(
network_metrics,
Expand All @@ -643,14 +642,12 @@ impl PeerManagerActor {
new_edge: Edge,
known_edges: Vec<Edge>,
) {
let span = tracing::trace_span!(target: "network", "send_sync").entered();
let run_later_span = tracing::trace_span!(target: "network", "send_sync_attempt");
near_performance_metrics::actix::run_later(
ctx,
WAIT_FOR_SYNC_DELAY.try_into().unwrap(),
move |act, _ctx| {
let _span =
tracing::trace_span!(target: "network", parent: &span, "send_sync_attempt")
.entered();
let _guard = run_later_span.enter();
// Start syncing network point of view. Wait until both parties are connected before start
// sending messages.
let known_accounts = act.routing_table_view.get_announce_accounts();
Expand Down

0 comments on commit 771bcff

Please sign in to comment.