Skip to content

Commit 93af12b

Browse files
committed
feat(subscriber): Reduce retention period to fit in max message size
If the initial update message would be too big for tokio-console's grpc decoder, reduce the retention period and try again. Currently the default retention period is 1 hour. That can easily grow to more than the max grpc decode message size (4 MiB), at which point tokio-console won't connect. There's really no minimum safe duration for retention. It depends on how busy the app is and on how much trace data runtime and tokio log. Here we repeatedly divide the retention period in half until it fits in the message.
1 parent 96c65bd commit 93af12b

File tree

3 files changed

+56
-22
lines changed

3 files changed

+56
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console-subscriber/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria
4646
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
4747
parking_lot_crate = { package = "parking_lot", version = "0.12", optional = true }
4848
humantime = "2.1.0"
49+
prost = "0.12"
4950
prost-types = "0.12.0"
5051

5152
# Required for recording:

console-subscriber/src/aggregator/mod.rs

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,34 @@
1-
use super::{Command, Event, Shared, Watch};
2-
use crate::{
3-
stats::{self, Unsent},
4-
ToProto, WatchRequest,
5-
};
6-
use console_api as proto;
7-
use proto::resources::resource;
8-
use tokio::sync::{mpsc, Notify};
9-
101
use std::{
112
sync::{
123
atomic::{AtomicBool, Ordering::*},
134
Arc,
145
},
156
time::{Duration, Instant},
167
};
8+
9+
use console_api as proto;
10+
use prost::Message;
11+
use proto::resources::resource;
12+
use tokio::sync::{mpsc, Notify};
1713
use tracing_core::{span::Id, Metadata};
1814

15+
use super::{Command, Event, Shared, Watch};
16+
use crate::{
17+
stats::{self, Unsent},
18+
ToProto, WatchRequest,
19+
};
20+
1921
mod id_data;
2022
mod shrink;
2123
use self::id_data::{IdData, Include};
2224
use self::shrink::{ShrinkMap, ShrinkVec};
2325

26+
/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE
27+
const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28+
29+
/// The smallest amount we will shrink retention to in an attempt to fit data in MAX_MESSAGE_SIZE
30+
const MIN_RETENTION: Duration = Duration::from_secs(1);
31+
2432
/// Aggregates instrumentation traces and prepares state for the instrument
2533
/// server.
2634
///
@@ -274,24 +282,48 @@ impl Aggregator {
274282
/// Add the task subscription to the watchers after sending the first update
275283
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
276284
tracing::debug!("new instrument subscription");
277-
278-
let task_update = Some(self.task_update(Include::All));
279-
let resource_update = Some(self.resource_update(Include::All));
280-
let async_op_update = Some(self.async_op_update(Include::All));
281285
let now = Instant::now();
282286

283-
let update = &proto::instrument::Update {
284-
task_update,
285-
resource_update,
286-
async_op_update,
287-
now: Some(self.base_time.to_timestamp(now)),
288-
new_metadata: Some(proto::RegisterMetadata {
289-
metadata: (*self.all_metadata).clone(),
290-
}),
287+
let update = loop {
288+
let update = proto::instrument::Update {
289+
task_update: Some(self.task_update(Include::All)),
290+
resource_update: Some(self.resource_update(Include::All)),
291+
async_op_update: Some(self.async_op_update(Include::All)),
292+
now: Some(self.base_time.to_timestamp(now)),
293+
new_metadata: Some(proto::RegisterMetadata {
294+
metadata: (*self.all_metadata).clone(),
295+
}),
296+
};
297+
let el = update.encoded_len();
298+
if el < MAX_MESSAGE_SIZE {
299+
// normal case
300+
break Some(update);
301+
}
302+
// If the grpc message is bigger than tokio-console will accept throw away the oldest
303+
// inactive data and try again
304+
self.retention /= 2;
305+
self.cleanup_closed();
306+
tracing::debug!(
307+
retention = ?self.retention,
308+
message_size = el,
309+
max_message_size = MAX_MESSAGE_SIZE,
310+
"Message too big, reduced retention",
311+
);
312+
313+
if self.retention <= MIN_RETENTION {
314+
self.retention = MIN_RETENTION;
315+
break None;
316+
}
291317
};
318+
if update.is_none() {
319+
tracing::error!(min_retention = ?MIN_RETENTION, "Message too big. Start with smaller retention.");
320+
// User will only get updates
321+
self.watchers.push(subscription);
322+
return;
323+
}
292324

293325
// Send the initial state --- if this fails, the subscription is already dead
294-
if subscription.update(update) {
326+
if subscription.update(&update.unwrap()) {
295327
self.watchers.push(subscription)
296328
}
297329
}

0 commit comments

Comments
 (0)