Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ tracing = "0.1.26"
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["fmt", "registry"] }
futures = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# The parking_lot dependency is renamed, because we want our `parking_lot`
# feature to also enable `tracing-subscriber`'s parking_lot feature flag.
parking_lot_crate = { package = "parking_lot", version = "0.11", optional = true }
humantime = "2.1.0"

# Required for recording:
serde = { version = "1", features = ["derive"] }
serde_json = "1"
crossbeam-channel = "0.5"

[dev-dependencies]
tokio = { version = "^1.7", features = ["full", "rt-multi-thread"] }
futures = "0.3"
Expand Down
66 changes: 16 additions & 50 deletions console-subscriber/src/aggregator/id_data.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use super::{shrink::ShrinkMap, DroppedAt, Id, ToProto};
use super::{shrink::ShrinkMap, Id, ToProto};
use crate::stats::{DroppedAt, Unsent};
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime};

pub(crate) struct IdData<T> {
data: ShrinkMap<Id, (T, bool)>,
data: ShrinkMap<Id, T>,
}

pub(crate) struct Updating<'a, T>(&'a mut (T, bool));

pub(crate) enum Include {
All,
UpdatedOnly,
Expand All @@ -19,31 +17,19 @@ pub(crate) enum Include {
impl<T> Default for IdData<T> {
fn default() -> Self {
IdData {
data: ShrinkMap::<Id, (T, bool)>::new(),
data: ShrinkMap::<Id, T>::new(),
}
}
}

impl<T> IdData<T> {
pub(crate) fn update_or_default(&mut self, id: Id) -> Updating<'_, T>
where
T: Default,
{
Updating(self.data.entry(id).or_default())
}

pub(crate) fn update(&mut self, id: &Id) -> Option<Updating<'_, T>> {
self.data.get_mut(id).map(Updating)
}

impl<T: Unsent> IdData<T> {
pub(crate) fn insert(&mut self, id: Id, data: T) {
self.data.insert(id, (data, true));
self.data.insert(id, data);
}

pub(crate) fn since_last_update(&mut self) -> impl Iterator<Item = (&Id, &mut T)> {
self.data.iter_mut().filter_map(|(id, (data, dirty))| {
if *dirty {
*dirty = false;
self.data.iter_mut().filter_map(|(id, data)| {
if data.take_unsent() {
Some((id, data))
} else {
None
Expand All @@ -52,11 +38,11 @@ impl<T> IdData<T> {
}

pub(crate) fn all(&self) -> impl Iterator<Item = (&Id, &T)> {
self.data.iter().map(|(id, (data, _))| (id, data))
self.data.iter()
}

pub(crate) fn get(&self, id: &Id) -> Option<&T> {
self.data.get(id).map(|(data, _)| data)
self.data.get(id)
}

pub(crate) fn as_proto(&mut self, include: Include) -> HashMap<u64, T::Output>
Expand All @@ -75,7 +61,7 @@ impl<T> IdData<T> {
}
}

pub(crate) fn drop_closed<R: DroppedAt>(
pub(crate) fn drop_closed<R: DroppedAt + Unsent>(
&mut self,
stats: &mut IdData<R>,
now: SystemTime,
Expand All @@ -92,18 +78,19 @@ impl<T> IdData<T> {
// drop closed entities
tracing::trace!(?retention, has_watchers, "dropping closed");

stats.data.retain_and_shrink(|id, (stats, dirty)| {
stats.data.retain_and_shrink(|id, stats| {
if let Some(dropped_at) = stats.dropped_at() {
let dropped_for = now.duration_since(dropped_at).unwrap_or_default();
let dirty = stats.is_unsent();
let should_drop =
// if there are any clients watching, retain all dirty tasks regardless of age
(*dirty && has_watchers)
(dirty && has_watchers)
|| dropped_for > retention;
tracing::trace!(
stats.id = ?id,
stats.dropped_at = ?dropped_at,
stats.dropped_for = ?dropped_for,
stats.dirty = *dirty,
stats.dirty = dirty,
should_drop,
);
return !should_drop;
Expand All @@ -114,27 +101,6 @@ impl<T> IdData<T> {

// drop closed entities which no longer have stats.
self.data
.retain_and_shrink(|id, (_, _)| stats.data.contains_key(id));
}
}

// === impl Updating ===

impl<'a, T> Deref for Updating<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.0 .0
}
}

impl<'a, T> DerefMut for Updating<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0 .0
}
}

impl<'a, T> Drop for Updating<'a, T> {
fn drop(&mut self) {
self.0 .1 = true;
.retain_and_shrink(|id, _| stats.data.contains_key(id));
}
}
Loading