diff --git a/console-api/src/tasks.rs b/console-api/src/tasks.rs index 3d62eb420..1955ba5a0 100644 --- a/console-api/src/tasks.rs +++ b/console-api/src/tasks.rs @@ -1 +1,17 @@ tonic::include_proto!("rs.tokio.console.tasks"); + +// === IDs === + +impl From for TaskId { + fn from(id: u64) -> Self { + TaskId { id } + } +} + +impl From for u64 { + fn from(id: TaskId) -> Self { + id.id + } +} + +impl Copy for TaskId {} diff --git a/console-subscriber/src/aggregator.rs b/console-subscriber/src/aggregator.rs index 3ed0cf716..640963c24 100644 --- a/console-subscriber/src/aggregator.rs +++ b/console-subscriber/src/aggregator.rs @@ -6,7 +6,7 @@ use tokio::sync::{mpsc, Notify}; use futures::FutureExt; use std::{ - collections::HashMap, + collections::{hash_map::Entry, HashMap}, convert::TryInto, ops::{Deref, DerefMut}, sync::{ @@ -22,6 +22,8 @@ use hdrhistogram::{ Histogram, }; +pub type TaskId = u64; + pub(crate) struct Aggregator { /// Channel of incoming events emitted by `TaskLayer`s. events: mpsc::Receiver, @@ -42,7 +44,7 @@ pub(crate) struct Aggregator { watchers: Vec>, /// Currently active RPCs streaming task details events, by task ID. - details_watchers: HashMap>>, + details_watchers: HashMap>>, /// *All* metadata for task spans and user-defined spans that we care about. /// @@ -59,6 +61,12 @@ pub(crate) struct Aggregator { /// Map of task IDs to task stats. stats: TaskData, + + /// A counter for the pretty task IDs. + task_id_counter: TaskId, + + /// A table that contains the span ID to pretty task ID mappings. + task_id_mappings: HashMap, } #[derive(Debug)] @@ -89,7 +97,7 @@ struct Stats { #[derive(Default)] struct TaskData { - data: HashMap, + data: HashMap, } struct Task { @@ -139,9 +147,11 @@ impl Aggregator { all_metadata: Vec::new(), new_metadata: Vec::new(), tasks: TaskData { - data: HashMap::::new(), + data: HashMap::::new(), }, stats: TaskData::default(), + task_id_counter: 0, + task_id_mappings: HashMap::new(), } } @@ -223,13 +233,13 @@ impl Aggregator { let new_tasks = self .tasks .all() - .map(|(id, task)| task.to_proto(id.clone())) + .map(|(&id, task)| task.to_proto(id)) .collect(); let now = SystemTime::now(); let stats_update = self .stats .all() - .map(|(id, stats)| (id.into_u64(), stats.to_proto())) + .map(|(&id, stats)| (id, stats.to_proto())) .collect(); // Send the initial state --- if this fails, the subscription is already dead if subscription.update(&proto::tasks::TaskUpdate { @@ -256,8 +266,7 @@ impl Aggregator { buffer, } = watch_request; tracing::debug!(id = ?id, "new task details subscription"); - let task_id: span::Id = id.into(); - if let Some(stats) = self.stats.get(&task_id) { + if let Some(stats) = self.stats.get(&id) { let (tx, rx) = mpsc::channel(buffer); let subscription = Watch(tx); let now = SystemTime::now(); @@ -265,13 +274,13 @@ impl Aggregator { // Then send the initial state --- if this fails, the subscription is already dead. if stream_sender.send(rx).is_ok() && subscription.update(&proto::tasks::TaskDetails { - task_id: Some(task_id.clone().into()), + task_id: Some(id.into()), now: Some(now.into()), poll_times_histogram: serialize_histogram(&stats.poll_times_histogram).ok(), }) { self.details_watchers - .entry(task_id) + .entry(id) .or_insert_with(Vec::new) .push(subscription); } @@ -294,13 +303,13 @@ impl Aggregator { let new_tasks = self .tasks .since_last_update() - .map(|(id, task)| task.to_proto(id.clone())) + .map(|(&id, task)| task.to_proto(id)) .collect(); let now = SystemTime::now(); let stats_update = self .stats .since_last_update() - .map(|(id, stats)| (id.into_u64(), stats.to_proto())) + .map(|(&id, stats)| (id, stats.to_proto())) .collect(); let update = proto::tasks::TaskUpdate { @@ -315,10 +324,10 @@ impl Aggregator { let stats = &self.stats; // Assuming there are much fewer task details subscribers than there are // stats updates, iterate over `details_watchers` and compact the map. - self.details_watchers.retain(|id, watchers| { - if let Some(task_stats) = stats.get(id) { + self.details_watchers.retain(|&id, watchers| { + if let Some(task_stats) = stats.get(&id) { let details = proto::tasks::TaskDetails { - task_id: Some(id.clone().into()), + task_id: Some(id.into()), now: Some(now.into()), poll_times_histogram: serialize_histogram(&task_stats.poll_times_histogram) .ok(), @@ -345,8 +354,9 @@ impl Aggregator { at, fields, } => { + let task_id = self.get_or_insert_task_id(id); self.tasks.insert( - id.clone(), + task_id, Task { metadata, fields, @@ -354,7 +364,7 @@ impl Aggregator { }, ); self.stats.insert( - id, + task_id, Stats { polls: 0, created_at: Some(at), @@ -363,7 +373,8 @@ impl Aggregator { ); } Event::Enter { id, at } => { - let mut stats = self.stats.update_or_default(id); + let task_id = self.get_or_insert_task_id(id); + let mut stats = self.stats.update_or_default(task_id); if stats.current_polls == 0 { stats.last_poll_started = Some(at); if stats.first_poll == None { @@ -375,7 +386,8 @@ impl Aggregator { } Event::Exit { id, at } => { - let mut stats = self.stats.update_or_default(id); + let task_id = self.get_or_insert_task_id(id); + let mut stats = self.stats.update_or_default(task_id); stats.current_polls -= 1; if stats.current_polls == 0 { if let Some(last_poll_started) = stats.last_poll_started { @@ -391,17 +403,19 @@ impl Aggregator { } Event::Close { id, at } => { - self.stats.update_or_default(id).closed_at = Some(at); + let task_id = self.get_or_insert_task_id(id); + self.stats.update_or_default(task_id).closed_at = Some(at); } Event::Waker { id, op, at } => { + let task_id = self.get_or_insert_task_id(id); // It's possible for wakers to exist long after a task has // finished. We don't want those cases to create a "new" // task that isn't closed, just to insert some waker stats. // // It may be useful to eventually be able to report about // "wasted" waker ops, but we'll leave that for another time. - if let Some(mut stats) = self.stats.update(&id) { + if let Some(mut stats) = self.stats.update(&task_id) { match op { WakeOp::Wake | WakeOp::WakeByRef => { stats.wakes += 1; @@ -431,6 +445,18 @@ impl Aggregator { } } + fn get_or_insert_task_id(&mut self, span_id: span::Id) -> TaskId { + match self.task_id_mappings.entry(span_id) { + Entry::Occupied(entry) => *entry.get(), + Entry::Vacant(entry) => { + let task_id = self.task_id_counter; + entry.insert(task_id); + self.task_id_counter = self.task_id_counter.wrapping_add(1); + task_id + } + } + } + fn drop_closed_tasks(&mut self) { let tasks = &mut self.tasks; let stats = &mut self.stats; @@ -510,22 +536,22 @@ impl Flush { } impl TaskData { - fn update_or_default(&mut self, id: span::Id) -> Updating<'_, T> + fn update_or_default(&mut self, id: TaskId) -> Updating<'_, T> where T: Default, { Updating(self.data.entry(id).or_default()) } - fn update(&mut self, id: &span::Id) -> Option> { + fn update(&mut self, id: &TaskId) -> Option> { self.data.get_mut(id).map(Updating) } - fn insert(&mut self, id: span::Id, data: T) { + fn insert(&mut self, id: TaskId, data: T) { self.data.insert(id, (data, true)); } - fn since_last_update(&mut self) -> impl Iterator { + fn since_last_update(&mut self) -> impl Iterator { self.data.iter_mut().filter_map(|(id, (data, dirty))| { if *dirty { *dirty = false; @@ -536,11 +562,11 @@ impl TaskData { }) } - fn all(&self) -> impl Iterator { + fn all(&self) -> impl Iterator { self.data.iter().map(|(id, (data, _))| (id, data)) } - fn get(&self, id: &span::Id) -> Option<&T> { + fn get(&self, id: &TaskId) -> Option<&T> { self.data.get(id).map(|(data, _)| data) } } @@ -603,7 +629,7 @@ impl Stats { } impl Task { - fn to_proto(&self, id: span::Id) -> proto::tasks::Task { + fn to_proto(&self, id: u64) -> proto::tasks::Task { proto::tasks::Task { id: Some(id.into()), // TODO: more kinds of tasks... diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 5bcce258a..c12ceaeb1 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1,5 +1,4 @@ use console_api as proto; -use proto::SpanId; use tokio::sync::{mpsc, oneshot}; use std::{ @@ -26,6 +25,8 @@ use callsites::Callsites; pub use init::{build, init}; +use crate::aggregator::TaskId; + pub struct TasksLayer { tx: mpsc::Sender, flush: Arc, @@ -58,7 +59,7 @@ enum WatchKind { } struct WatchRequest { - id: SpanId, + id: TaskId, stream_sender: oneshot::Sender>>, buffer: usize, } @@ -368,7 +369,7 @@ impl proto::tasks::tasks_server::Tasks for Server { // Check with the aggregator task to request a stream if the task exists. let (stream_sender, stream_recv) = oneshot::channel(); permit.send(WatchKind::TaskDetail(WatchRequest { - id: task_id.clone(), + id: task_id.into(), stream_sender, buffer: self.client_buffer, })); diff --git a/console/src/tasks.rs b/console/src/tasks.rs index 0d357b955..8c689876b 100644 --- a/console/src/tasks.rs +++ b/console/src/tasks.rs @@ -38,7 +38,6 @@ pub(crate) type DetailsRef = Rc>>; #[derive(Debug)] pub(crate) struct Task { id: u64, - id_hex: String, fields: Vec, formatted_fields: Vec>>, kind: &'static str, @@ -191,7 +190,6 @@ impl State { let stats = stats_update.remove(&id)?.into(); let mut task = Task { id, - id_hex: format!("{:x}", id), fields, formatted_fields, kind, @@ -260,10 +258,6 @@ impl Task { self.id } - pub(crate) fn id_hex(&self) -> &str { - &self.id_hex - } - pub(crate) fn target(&self) -> &str { &self.target } diff --git a/console/src/view/task.rs b/console/src/view/task.rs index 289f3723c..e37098b59 100644 --- a/console/src/view/task.rs +++ b/console/src/view/task.rs @@ -99,7 +99,7 @@ impl TaskView { Span::raw(" = quit"), ]); - let attrs = Spans::from(vec![bold("ID: "), Span::raw(task.id_hex())]); + let attrs = Spans::from(vec![bold("ID: "), Span::raw(task.id().to_string())]); let target = Spans::from(vec![bold("Target: "), Span::raw(task.target())]); let mut total = vec![bold("Total Time: "), dur(task.total(now))]; diff --git a/console/src/view/tasks.rs b/console/src/view/tasks.rs index 185548af4..57f9e2c10 100644 --- a/console/src/view/tasks.rs +++ b/console/src/view/tasks.rs @@ -108,7 +108,7 @@ impl List { let task = task.borrow(); let mut row = Row::new(vec![ - Cell::from(id_width.update_str(task.id_hex()).to_string()), + Cell::from(id_width.update_str(task.id().to_string())), // TODO(eliza): is there a way to write a `fmt::Debug` impl // directly to tui without doing an allocation? Cell::from(task.kind().to_string()), diff --git a/proto/tasks.proto b/proto/tasks.proto index d7e2acab5..47406f0ef 100644 --- a/proto/tasks.proto +++ b/proto/tasks.proto @@ -11,11 +11,15 @@ service Tasks { rpc WatchTaskDetails(DetailsRequest) returns (stream TaskDetails) {} } +message TaskId { + uint64 id = 1; +} + message TasksRequest { } message DetailsRequest { - common.SpanId id = 1; + TaskId id = 1; } // A task state update. @@ -52,7 +56,7 @@ message TaskUpdate { // A task details update message TaskDetails { // The task's ID which the details belong to. - common.SpanId task_id = 1; + TaskId task_id = 1; google.protobuf.Timestamp now = 2; @@ -69,7 +73,7 @@ message Task { // identified by this ID; if the client requires additional information // included in the `Task` message, it should store that data and access it // by ID. - common.SpanId id = 1; + TaskId id = 1; // The numeric ID of the task's `Metadata`. // // This identifies the `Metadata` that describes the `tracing` span