Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metrics-exporter-tcp/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fn main() {
println!("cargo:rerun-if-changed=proto/event.proto");
let mut prost_build = prost_build::Config::new();
prost_build.btree_map(&["."]);
prost_build.btree_map(["."]);
prost_build.compile_protos(&["proto/event.proto"], &["proto/"]).unwrap();
}
2 changes: 1 addition & 1 deletion metrics-observer/build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
fn main() {
println!("cargo:rerun-if-changed=proto/event.proto");
let mut prost_build = prost_build::Config::new();
prost_build.btree_map(&["."]);
prost_build.btree_map(["."]);
prost_build.compile_protos(&["proto/event.proto"], &["proto/"]).unwrap();
}
27 changes: 7 additions & 20 deletions metrics-observer/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,16 @@ message Metric {
string name = 1;
google.protobuf.Timestamp timestamp = 2;
map<string, string> labels = 3;
oneof value {
Counter counter = 4;
Gauge gauge = 5;
Histogram histogram = 6;
oneof operation {
uint64 increment_counter = 4;
uint64 set_counter = 5;
double increment_gauge = 6;
double decrement_gauge = 7;
double set_gauge = 8;
double record_histogram = 9;
}
}

message Counter {
uint64 value = 1;
}

message Gauge {
oneof value {
double absolute = 1;
double increment = 2;
double decrement = 3;
}
}

message Histogram {
double value = 1;
}

message Event {
oneof event {
Metadata metadata = 1;
Expand Down
197 changes: 107 additions & 90 deletions metrics-observer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ mod proto {
include!(concat!(env!("OUT_DIR"), "/event.proto.rs"));
}

use self::proto::{
event::Event,
metadata::{Description as DescriptionMetadata, MetricType, Unit as UnitMetadata},
Event as EventWrapper,
};
use proto::{event::Event, metadata::MetricType, metric::Operation, Event as ProstMessage};

type MetadataKey = (MetricKind, String);
type MetadataValue = (Option<Unit>, Option<String>);

#[derive(Clone)]
pub enum ClientState {
Expand All @@ -38,7 +37,7 @@ pub enum MetricData {
pub struct Client {
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<BTreeMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
metadata: Arc<RwLock<HashMap<MetadataKey, MetadataValue>>>,
}

impl Client {
Expand Down Expand Up @@ -93,15 +92,15 @@ struct Runner {
addr: String,
client_state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<BTreeMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
metadata: Arc<RwLock<HashMap<MetadataKey, MetadataValue>>>,
}

impl Runner {
pub fn new(
addr: String,
state: Arc<Mutex<ClientState>>,
metrics: Arc<RwLock<BTreeMap<CompositeKey, MetricData>>>,
metadata: Arc<RwLock<HashMap<(MetricKind, String), (Option<Unit>, Option<String>)>>>,
metadata: Arc<RwLock<HashMap<MetadataKey, MetadataValue>>>,
) -> Runner {
Runner { state: RunnerState::Disconnected, addr, client_state: state, metrics, metadata }
}
Expand Down Expand Up @@ -172,98 +171,116 @@ impl Runner {
Err(e) => eprintln!("read error: {:?}", e),
};

let event = match EventWrapper::decode_length_delimited(&mut buf) {
let message = match ProstMessage::decode_length_delimited(&mut buf) {
Err(e) => {
eprintln!("decode error: {:?}", e);
continue;
}
Ok(event) => event,
Ok(v) => v,
};

if let Some(event) = event.event {
match event {
Event::Metadata(metadata) => {
let metric_type = MetricType::from_i32(metadata.metric_type)
.expect("unknown metric type over wire");
let metric_kind = match metric_type {
MetricType::Counter => MetricKind::Counter,
MetricType::Gauge => MetricKind::Gauge,
MetricType::Histogram => MetricKind::Histogram,
};
let key = (metric_kind, metadata.name);
let mut mmap = self
.metadata
.write()
.expect("failed to get metadata write lock");
let entry = mmap.entry(key).or_insert((None, None));
let (uentry, dentry) = entry;
*uentry = metadata
.unit
.map(|u| match u {
UnitMetadata::UnitValue(us) => us,
})
.and_then(|s| Unit::from_string(s.as_str()));
*dentry = metadata.description.map(|d| match d {
DescriptionMetadata::DescriptionValue(ds) => ds,
});
}
Event::Metric(metric) => {
let mut labels_raw =
metric.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw
.into_iter()
.map(|(k, v)| Label::new(k, v))
.collect::<Vec<_>>();
let key_data: Key = (metric.name, labels).into();
let event = match message.event {
Some(e) => e,
None => continue,
};

match metric.value.expect("no metric value") {
proto::metric::Value::Counter(value) => {
let key =
CompositeKey::new(MetricKind::Counter, key_data);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value.value;
}
match event {
Event::Metadata(metadata) => {
let metric_type = MetricType::from_i32(metadata.metric_type)
.expect("unknown metric type over wire");
let metric_kind = match metric_type {
MetricType::Counter => MetricKind::Counter,
MetricType::Gauge => MetricKind::Gauge,
MetricType::Histogram => MetricKind::Histogram,
};
let key = (metric_kind, metadata.name);
let mut mmap = self
.metadata
.write()
.expect("failed to get metadata write lock");
let entry = mmap.entry(key).or_insert((None, None));
let (uentry, dentry) = entry;
*uentry = metadata
.unit
.map(|u| match u {
proto::metadata::Unit::UnitValue(u) => u,
})
.and_then(|s| Unit::from_string(s.as_str()));
*dentry = metadata.description.map(|d| match d {
proto::metadata::Description::DescriptionValue(ds) => ds,
});
}
Event::Metric(metric) => {
let mut labels_raw = metric.labels.into_iter().collect::<Vec<_>>();
labels_raw.sort_by(|a, b| a.0.cmp(&b.0));
let labels = labels_raw
.into_iter()
.map(|(k, v)| Label::new(k, v))
.collect::<Vec<_>>();
let key_data: Key = (metric.name, labels).into();

match metric.operation.expect("no metric operation") {
Operation::IncrementCounter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner += value;
}
proto::metric::Value::Gauge(value) => {
let key =
CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
match value.value {
Some(proto::gauge::Value::Absolute(val)) => {
*inner = val
}
Some(proto::gauge::Value::Increment(val)) => {
*inner += val
}
Some(proto::gauge::Value::Decrement(val)) => {
*inner -= val
}
None => {}
}
}
}
Operation::SetCounter(value) => {
let key = CompositeKey::new(MetricKind::Counter, key_data);
let mut metrics = self.metrics.write().unwrap();
let counter = metrics
.entry(key)
.or_insert_with(|| MetricData::Counter(0));
if let MetricData::Counter(inner) = counter {
*inner = value;
}
proto::metric::Value::Histogram(value) => {
let key =
CompositeKey::new(MetricKind::Histogram, key_data);
let mut metrics = self.metrics.write().unwrap();
let histogram =
metrics.entry(key).or_insert_with(|| {
let summary = Summary::with_defaults();
MetricData::Histogram(summary)
});
}
Operation::IncrementGauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner += value;
}
}
Operation::DecrementGauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner -= value;
}
}
Operation::SetGauge(value) => {
let key = CompositeKey::new(MetricKind::Gauge, key_data);
let mut metrics = self.metrics.write().unwrap();
let gauge = metrics
.entry(key)
.or_insert_with(|| MetricData::Gauge(0.0));
if let MetricData::Gauge(inner) = gauge {
*inner = value;
}
}
Operation::RecordHistogram(value) => {
let key =
CompositeKey::new(MetricKind::Histogram, key_data);
let mut metrics = self.metrics.write().unwrap();
let histogram = metrics.entry(key).or_insert_with(|| {
let summary = Summary::with_defaults();
MetricData::Histogram(summary)
});

if let MetricData::Histogram(inner) = histogram {
inner.add(value.value);
}
if let MetricData::Histogram(inner) = histogram {
inner.add(value);
}
}
}
Expand Down