-
Notifications
You must be signed in to change notification settings - Fork 124
/
tracing_layers.rs
111 lines (98 loc) · 3.95 KB
/
tracing_layers.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use parking_lot::Mutex;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::DerefMut;
use std::sync::Arc;
use tracing::span::Attributes;
use tracing::subscriber::Interest;
use tracing::Id;
use tracing::{Event, Metadata, Subscriber};
use tracing_subscriber::registry::SpanRef;
use tracing_subscriber::{layer::Context, Layer};
use crate::metrics_collection::{
CollectedMetricChildren, CollectedMetricPrimitive, CollectedMetrics, RootMetricCollectedMetrics,
};
use crate::metrics_visitors::{MetricDataVisitor, SpanFields};
pub struct MetricsCollectorLayer<S> {
spans: Mutex<HashMap<Id, SpanFields>>,
root_collected_metrics: Arc<Mutex<RootMetricCollectedMetrics>>,
_subscriber: PhantomData<S>,
}
impl<S> MetricsCollectorLayer<S> {
pub fn new() -> (Self, Arc<Mutex<RootMetricCollectedMetrics>>) {
let root_collected_metrics = Arc::new(Mutex::new(RootMetricCollectedMetrics::default()));
(
MetricsCollectorLayer {
spans: Mutex::new(HashMap::new()),
root_collected_metrics: root_collected_metrics.clone(),
_subscriber: PhantomData,
},
root_collected_metrics,
)
}
}
impl<S> Layer<S> for MetricsCollectorLayer<S>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Debug,
{
fn enabled(&self, metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool {
metadata.target() == "nativelink_metric"
}
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
let mut span_fields = SpanFields {
name: Cow::Borrowed(attrs.metadata().name()),
};
attrs.values().record(&mut span_fields);
self.spans.lock().insert(id.clone(), span_fields);
}
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let mut event_visitor = MetricDataVisitor::default();
event.record(&mut event_visitor);
let name = event_visitor.name.clone();
let mut root_collected_metrics = self.root_collected_metrics.lock();
let mut collected_component = root_collected_metrics.deref_mut().deref_mut();
if let Some(current_span) = ctx.lookup_current() {
let mut known_spans = self.spans.lock();
let span_iter = current_span.scope().from_root();
let collected_component =
self.find_component(span_iter, known_spans.deref_mut(), &mut collected_component);
let primitive = CollectedMetricPrimitive::from(event_visitor);
collected_component.insert(name, CollectedMetrics::Primitive(primitive));
} else {
let primitive = CollectedMetricPrimitive::from(event_visitor);
collected_component.insert(name, CollectedMetrics::Primitive(primitive));
}
}
fn register_callsite(&self, _metadata: &'static Metadata<'static>) -> Interest {
Interest::always()
}
}
impl<S> MetricsCollectorLayer<S>
where
S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a> + Debug,
{
fn find_component<'a, 'b, I>(
&self,
mut iter: I,
known_spans: &'a mut HashMap<Id, SpanFields>,
mut collected_component: &'a mut CollectedMetricChildren,
) -> &'a mut CollectedMetricChildren
where
I: Iterator<Item = SpanRef<'b, S>>,
{
let Some(span) = iter.next() else {
return collected_component;
};
let span_fields = known_spans.get(&span.id()).expect("Span not found");
let collected_metric = collected_component
.entry(span_fields.name.to_string())
.or_insert_with(|| CollectedMetrics::new_component());
collected_component = match collected_metric {
CollectedMetrics::Component(component) => component.deref_mut(),
_ => panic!("Expected to be component"),
};
self.find_component(iter, known_spans, collected_component)
}
}