Skip to content

Commit f422d24

Browse files
committed
feat(subscriber): Reduce retention period to fit in max message size
If the initial update message would be too big for tokio-console's grpc decoder, reduce the retention period and try again. Currently the default retention period is 1 hour. That can easily grow to more than the max grpc decode message size (4 MiB), at which point tokio-console won't connect. There's really no minimum safe duration for retention. It depends on how busy the app is and on how much trace data runtime and tokio log. Here we repeatedly divide the retention period in half until it fits in the message.
1 parent 1656c79 commit f422d24

File tree

3 files changed

+52
-15
lines changed

3 files changed

+52
-15
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

console-subscriber/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria
4646
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
4747
parking_lot_crate = { package = "parking_lot", version = "0.12", optional = true }
4848
humantime = "2.1.0"
49+
prost = "0.12"
4950
prost-types = "0.12.0"
5051

5152
# Required for recording:

console-subscriber/src/aggregator/mod.rs

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
};
88

99
use console_api as proto;
10+
use prost::Message;
1011
use proto::resources::resource;
1112
use tokio::sync::{mpsc, Notify};
1213
use tracing_core::{span::Id, Metadata};
@@ -22,6 +23,9 @@ mod shrink;
2223
use self::id_data::{IdData, Include};
2324
use self::shrink::{ShrinkMap, ShrinkVec};
2425

26+
/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE
27+
const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024;
28+
2529
/// Aggregates instrumentation traces and prepares state for the instrument
2630
/// server.
2731
///
@@ -278,26 +282,57 @@ impl Aggregator {
278282
/// Add the task subscription to the watchers after sending the first update
279283
fn add_instrument_subscription(&mut self, subscription: Watch<proto::instrument::Update>) {
280284
tracing::debug!("new instrument subscription");
281-
282-
let task_update = Some(self.task_update(Include::All));
283-
let resource_update = Some(self.resource_update(Include::All));
284-
let async_op_update = Some(self.async_op_update(Include::All));
285285
let now = Instant::now();
286286

287-
let update = &proto::instrument::Update {
288-
task_update,
289-
resource_update,
290-
async_op_update,
291-
now: Some(self.base_time.to_timestamp(now)),
292-
new_metadata: Some(proto::RegisterMetadata {
293-
metadata: (*self.all_metadata).clone(),
294-
}),
287+
let update = loop {
288+
let update = proto::instrument::Update {
289+
task_update: Some(self.task_update(Include::All)),
290+
resource_update: Some(self.resource_update(Include::All)),
291+
async_op_update: Some(self.async_op_update(Include::All)),
292+
now: Some(self.base_time.to_timestamp(now)),
293+
new_metadata: Some(proto::RegisterMetadata {
294+
metadata: (*self.all_metadata).clone(),
295+
}),
296+
};
297+
let el = update.encoded_len();
298+
if el < MAX_MESSAGE_SIZE {
299+
// normal case
300+
break Some(update);
301+
}
302+
// If the grpc message is bigger than tokio-console will accept, throw away the oldest
303+
// inactive data and try again
304+
self.retention /= 2;
305+
self.cleanup_closed();
306+
tracing::debug!(
307+
retention = ?self.retention,
308+
message_size = el,
309+
max_message_size = MAX_MESSAGE_SIZE,
310+
"Message too big, reduced retention",
311+
);
312+
313+
if self.retention <= self.publish_interval {
314+
self.retention = self.publish_interval;
315+
break None;
316+
}
295317
};
296318

297-
// Send the initial state --- if this fails, the subscription is already dead
298-
if subscription.update(update) {
299-
self.watchers.push(subscription)
319+
match update {
320+
// Send the initial state
321+
Some(update) => {
322+
if !subscription.update(&update) {
323+
// If sending the initial update fails, the subscription is already dead,
324+
// so don't add it to `watchers`.
325+
return;
326+
}
327+
}
328+
// User will only get updates.
329+
None => tracing::error!(
330+
min_retention = ?self.publish_interval,
331+
"Message too big. Start with smaller retention.",
332+
),
300333
}
334+
335+
self.watchers.push(subscription);
301336
}
302337

303338
fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {

0 commit comments

Comments
 (0)