diff --git a/proto/event.proto b/proto/event.proto index 433e3ad2f24c1..17c267bd09bf7 100644 --- a/proto/event.proto +++ b/proto/event.proto @@ -5,14 +5,52 @@ package event.proto; message EventWrapper { oneof event { Log log = 1; + Metric metric = 2; } } +message Log { + map structured = 1; +} + message Value { bytes data = 1; bool explicit = 2; } -message Log { - map structured = 1; +message Metric { + oneof metric { + Counter counter = 1; + Timer timer = 2; + Gauge gauge = 3; + Set set = 4; + } +} + +message Counter { + string name = 1; + uint32 val = 2; + float sampling = 3; +} + +message Timer { + string name = 1; + uint32 val = 2; + float sampling = 3; +} + +message Gauge { + string name = 1; + uint32 val = 2; + enum Direction { + None = 0; + Plus = 1; + Minus = 2; + } + Direction direction = 3; +} + +message Set { + string name = 1; + string val = 2; } diff --git a/src/event.rs b/src/event.rs index 44b9fb1a0bb44..258a7d3af35a0 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,4 +1,4 @@ -use self::proto::{event_wrapper::Event as EventProto, Log}; +use self::proto::{event_wrapper::Event as EventProto, metric::Metric as MetricProto, Log}; use bytes::Bytes; use chrono::{DateTime, SecondsFormat, Utc}; use lazy_static::lazy_static; @@ -7,6 +7,10 @@ use std::borrow::Cow; use std::collections::HashMap; use string_cache::DefaultAtom as Atom; +pub mod metric; + +pub use metric::Metric; + pub mod proto { include!(concat!(env!("OUT_DIR"), "/event.proto.rs")); } @@ -20,6 +24,7 @@ lazy_static! { #[derive(PartialEq, Debug, Clone)] pub enum Event { Log(LogEvent), + Metric(Metric), } #[derive(PartialEq, Debug, Clone)] @@ -37,18 +42,28 @@ impl Event { pub fn as_log(&self) -> &LogEvent { match self { Event::Log(log) => log, + _ => panic!("failed type coercion, {:?} is not a log event", self), } } pub fn as_mut_log(&mut self) -> &mut LogEvent { match self { Event::Log(log) => log, + _ => panic!("failed type coercion, {:?} is not a log event", self), } } pub fn into_log(self) -> LogEvent { match self { Event::Log(log) => log, + _ => panic!("failed type coercion, {:?} is not a log event", self), + } + } + + pub fn into_metric(self) -> Metric { + match self { + Event::Metric(metric) => metric, + _ => panic!("failed type coercion, {:?} is not a metric", self), } } } @@ -233,6 +248,51 @@ impl From for Event { Event::Log(LogEvent { structured }) } + EventProto::Metric(proto) => { + let metric = proto.metric.unwrap(); + match metric { + MetricProto::Counter(counter) => { + let sampling = if counter.sampling == 0f32 { + None + } else { + Some(counter.sampling) + }; + Event::Metric(Metric::Counter { + name: counter.name, + val: counter.val, + sampling, + }) + } + MetricProto::Timer(timer) => { + let sampling = if timer.sampling == 0f32 { + None + } else { + Some(timer.sampling) + }; + Event::Metric(Metric::Timer { + name: timer.name, + val: timer.val, + sampling, + }) + } + MetricProto::Gauge(gauge) => { + let direction = match gauge.direction() { + proto::gauge::Direction::None => None, + proto::gauge::Direction::Plus => Some(metric::Direction::Plus), + proto::gauge::Direction::Minus => Some(metric::Direction::Minus), + }; + Event::Metric(Metric::Gauge { + name: gauge.name, + val: gauge.val, + direction, + }) + } + MetricProto::Set(set) => Event::Metric(Metric::Set { + name: set.name, + val: set.val, + }), + } + } } } } @@ -256,20 +316,77 @@ impl From for proto::EventWrapper { proto::EventWrapper { event: Some(event) } } + Event::Metric(Metric::Counter { + name, + val, + sampling, + }) => { + let counter = proto::Counter { + name, + val, + sampling: sampling.unwrap_or(0f32), + }; + let event = EventProto::Metric(proto::Metric { + metric: Some(MetricProto::Counter(counter)), + }); + proto::EventWrapper { event: Some(event) } + } + Event::Metric(Metric::Timer { + name, + val, + sampling, + }) => { + let timer = proto::Timer { + name, + val, + sampling: sampling.unwrap_or(0f32), + }; + let event = EventProto::Metric(proto::Metric { + metric: Some(MetricProto::Timer(timer)), + }); + proto::EventWrapper { event: Some(event) } + } + Event::Metric(Metric::Gauge { + name, + val, + direction, + }) => { + let direction = match direction { + None => proto::gauge::Direction::None, + Some(metric::Direction::Plus) => proto::gauge::Direction::Plus, + Some(metric::Direction::Minus) => proto::gauge::Direction::Minus, + } + .into(); + let gauge = proto::Gauge { + name, + val, + direction, + }; + let event = EventProto::Metric(proto::Metric { + metric: Some(MetricProto::Gauge(gauge)), + }); + proto::EventWrapper { event: Some(event) } + } + Event::Metric(Metric::Set { name, val }) => { + let set = proto::Set { name, val }; + let event = EventProto::Metric(proto::Metric { + metric: Some(MetricProto::Set(set)), + }); + proto::EventWrapper { event: Some(event) } + } } } } +// TODO: should probably get rid of this impl From for Vec { fn from(event: Event) -> Vec { - match event { - Event::Log(LogEvent { mut structured }) => structured - .remove(&MESSAGE) - .unwrap() - .value - .as_bytes() - .into_owned(), - } + event + .into_log() + .into_value(&MESSAGE) + .unwrap() + .as_bytes() + .into_owned() } } diff --git a/src/event/metric.rs b/src/event/metric.rs new file mode 100644 index 0000000000000..ae98607ac6496 --- /dev/null +++ b/src/event/metric.rs @@ -0,0 +1,28 @@ +#[derive(Debug, Clone, PartialEq)] +pub enum Metric { + Counter { + name: String, + val: u32, + sampling: Option, + }, + Timer { + name: String, + val: u32, + sampling: Option, + }, + Gauge { + name: String, + val: u32, + direction: Option, + }, + Set { + name: String, + val: String, + }, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum Direction { + Plus, + Minus, +} diff --git a/src/sinks/prometheus.rs b/src/sinks/prometheus.rs index 14a2762103ea2..fbfcd75a358fc 100644 --- a/src/sinks/prometheus.rs +++ b/src/sinks/prometheus.rs @@ -1,15 +1,16 @@ -use crate::buffers::Acker; -use crate::Event; +use crate::{ + buffers::Acker, + event::{metric::Direction, Metric}, + Event, +}; use futures::{future, Async, AsyncSink, Future, Sink}; -use hyper::service::service_fn; -use hyper::{header::HeaderValue, Body, Method, Request, Response, Server, StatusCode}; +use hyper::{ + header::HeaderValue, service::service_fn, Body, Method, Request, Response, Server, StatusCode, +}; use prometheus::{Encoder, Registry, TextEncoder}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use stream_cancel::{Trigger, Tripwire}; -use string_cache::DefaultAtom as Atom; use tokio_trace::field; #[derive(Deserialize, Serialize, Debug)] @@ -17,23 +18,6 @@ use tokio_trace::field; pub struct PrometheusSinkConfig { #[serde(default = "default_address")] pub address: SocketAddr, - pub counters: Vec, - pub gauges: Vec, -} - -#[derive(Deserialize, Serialize, Debug, Clone, Eq, PartialEq, Hash)] -pub struct Counter { - pub key: Atom, - pub label: String, - pub doc: String, - pub parse_value: bool, -} - -#[derive(Deserialize, Serialize, Debug, Clone, Eq, PartialEq, Hash)] -pub struct Gauge { - pub key: Atom, - pub label: String, - pub doc: String, } pub fn default_address() -> SocketAddr { @@ -45,24 +29,23 @@ pub fn default_address() -> SocketAddr { #[typetag::serde(name = "prometheus")] impl crate::topology::config::SinkConfig for PrometheusSinkConfig { fn build(&self, acker: Acker) -> Result<(super::RouterSink, super::Healthcheck), String> { - let sink = Box::new(PrometheusSink::new( - self.address, - self.counters.clone(), - self.gauges.clone(), - acker, - )); + let sink = Box::new(PrometheusSink::new(self.address, acker)); let healthcheck = Box::new(future::ok(())); Ok((sink, healthcheck)) } + + fn input_type(&self) -> crate::topology::config::DataType { + crate::topology::config::DataType::Metric + } } struct PrometheusSink { registry: Arc, server_shutdown_trigger: Option, address: SocketAddr, - counters: HashMap, - gauges: HashMap, + counters: HashMap, + gauges: HashMap, acker: Acker, } @@ -98,41 +81,39 @@ fn handle( } impl PrometheusSink { - fn new(address: SocketAddr, counters: Vec, gauges: Vec, acker: Acker) -> Self { - let registry = Registry::new(); - - let counters = counters - .into_iter() - .map(|config| { - let counter = - prometheus::Counter::new(config.label.clone(), config.doc.clone()).unwrap(); - registry.register(Box::new(counter.clone())).unwrap(); - - (config, counter) - }) - .collect(); - - let gauges = gauges - .into_iter() - .map(|config| { - let gauge = - prometheus::Gauge::new(config.label.clone(), config.doc.clone()).unwrap(); - registry.register(Box::new(gauge.clone())).unwrap(); - - (config, gauge) - }) - .collect(); - + fn new(address: SocketAddr, acker: Acker) -> Self { Self { - registry: Arc::new(registry), + registry: Arc::new(Registry::new()), server_shutdown_trigger: None, address, - counters, - gauges, + counters: HashMap::new(), + gauges: HashMap::new(), acker, } } + fn with_counter(&mut self, name: String, f: impl Fn(&prometheus::Counter)) { + if let Some(counter) = self.counters.get(&name) { + f(counter); + } else { + let counter = prometheus::Counter::new(name.clone(), name.clone()).unwrap(); + self.registry.register(Box::new(counter.clone())).unwrap(); + f(&counter); + self.counters.insert(name, counter); + } + } + + fn with_gauge(&mut self, name: String, f: impl Fn(&prometheus::Gauge)) { + if let Some(gauge) = self.gauges.get(&name) { + f(gauge); + } else { + let gauge = prometheus::Gauge::new(name.clone(), name.clone()).unwrap(); + self.registry.register(Box::new(gauge.clone())).unwrap(); + f(&gauge); + self.gauges.insert(name.clone(), gauge); + } + } + fn start_server_if_needed(&mut self) { if self.server_shutdown_trigger.is_some() { return; @@ -174,37 +155,27 @@ impl Sink for PrometheusSink { ) -> Result, Self::SinkError> { self.start_server_if_needed(); - for (field, counter) in &self.counters { - if let Some(val) = event.as_log().get(&field.key) { - if field.parse_value { - let val = val.to_string_lossy(); - - if let Ok(count) = val.parse() { - counter.inc_by(count); - } else { - warn!( - "Unable to parse value from field {} with value {}", - field.key, val - ); - } - } else { - counter.inc_by(1.0); - } - } - } - - for (field, gauge) in &self.gauges { - if let Some(val) = event.as_log().get(&field.key) { - let val = val.to_string_lossy(); - - if let Ok(count) = val.parse() { - gauge.set(count); - } else { - warn!( - "Unable to parse value from field {} with value {}", - field.key, val - ); + match event.into_metric() { + Metric::Counter { + name, + val, + // TODO: take sampling into account + sampling: _, + } => self.with_counter(name, |counter| counter.inc_by(val as f64)), + Metric::Gauge { + name, + val, + direction, + } => self.with_gauge(name, |gauge| { + let val = val as f64; + match direction { + None => gauge.set(val), + Some(Direction::Plus) => gauge.add(val), + Some(Direction::Minus) => gauge.sub(val), } + }), + _ => { + // TODO: support all the metric types } } diff --git a/src/sources/statsd.rs b/src/sources/statsd.rs index 0300aaa70c60a..3352ae427e134 100644 --- a/src/sources/statsd.rs +++ b/src/sources/statsd.rs @@ -12,35 +12,6 @@ use tokio_trace::field; mod parser; -#[derive(Debug, PartialEq)] -pub enum Metric { - Counter { - name: String, - val: usize, - sampling: Option, - }, - Timer { - name: String, - val: usize, - sampling: Option, - }, - Gauge { - name: String, - val: usize, - direction: Option, - }, - Set { - name: String, - val: String, - }, -} - -#[derive(Debug, PartialEq)] -pub enum Direction { - Plus, - Minus, -} - #[derive(Deserialize, Serialize, Debug)] struct StatsdConfig { address: SocketAddr, @@ -51,6 +22,10 @@ impl crate::topology::config::SourceConfig for StatsdConfig { fn build(&self, out: mpsc::Sender) -> Result { Ok(statsd(self.address.clone(), out)) } + + fn output_type(&self) -> crate::topology::config::DataType { + crate::topology::config::DataType::Metric + } } fn statsd(addr: SocketAddr, out: mpsc::Sender) -> super::Source { @@ -76,7 +51,7 @@ fn statsd(addr: SocketAddr, out: mpsc::Sender) -> super::Source { .lines() .map(parse) .filter_map(|res| res.map_err(|e| error!("{}", e)).ok()) - .map(Event::from) + .map(Event::Metric) .collect::>(); futures::stream::iter_ok::<_, std::io::Error>(metrics) }) @@ -88,26 +63,11 @@ fn statsd(addr: SocketAddr, out: mpsc::Sender) -> super::Source { ) } -impl From for Event { - fn from(metric: Metric) -> Event { - match metric { - Metric::Counter { name, val, .. } | Metric::Gauge { name, val, .. } => { - let mut event = Event::new_empty_log(); - event - .as_mut_log() - .insert_explicit(name.into(), val.to_string().into()); - event - } - _ => Event::from(format!("{:?}", metric)), - } - } -} - #[cfg(test)] mod test { use super::StatsdConfig; use crate::{ - sinks::prometheus::{Counter, Gauge, PrometheusSinkConfig}, + sinks::prometheus::PrometheusSinkConfig, test_util::{block_on, next_addr, shutdown_on_idle}, topology::{self, config}, }; @@ -121,24 +81,7 @@ mod test { let mut config = config::Config::empty(); config.add_source("in", StatsdConfig { address: in_addr }); - config.add_sink( - "out", - &["in"], - PrometheusSinkConfig { - address: out_addr, - counters: vec![Counter { - key: "foo".into(), - label: "foo".into(), - doc: "foo".into(), - parse_value: true, - }], - gauges: vec![Gauge { - key: "bar".into(), - label: "bar".into(), - doc: "bar".into(), - }], - }, - ); + config.add_sink("out", &["in"], PrometheusSinkConfig { address: out_addr }); let mut rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/src/sources/statsd/parser.rs b/src/sources/statsd/parser.rs index 221cc2c255522..7b42d43997170 100644 --- a/src/sources/statsd/parser.rs +++ b/src/sources/statsd/parser.rs @@ -1,4 +1,4 @@ -use super::{Direction, Metric}; +use crate::event::{metric::Direction, Metric}; use lazy_static::lazy_static; use regex::Regex; use std::{ @@ -137,7 +137,8 @@ impl From for ParseError { #[cfg(test)] mod test { - use super::{parse, sanitize_key, Direction, Metric}; + use super::{parse, sanitize_key}; + use crate::event::{metric::Direction, Metric}; #[test] fn basic_counter() { diff --git a/src/transforms.rs b/src/transforms.rs index 117a29d9a63a5..937cfc522370b 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -3,6 +3,7 @@ use crate::Event; pub mod add_fields; pub mod field_filter; pub mod json_parser; +pub mod log_to_metric; pub mod lua; pub mod regex_parser; pub mod remove_fields; diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs new file mode 100644 index 0000000000000..6a1bfa4bb4237 --- /dev/null +++ b/src/transforms/log_to_metric.rs @@ -0,0 +1,185 @@ +use super::Transform; +use crate::{event::metric::Metric, Event}; +use serde::{Deserialize, Serialize}; +use string_cache::DefaultAtom as Atom; + +#[derive(Deserialize, Serialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct LogToMetricConfig { + pub counters: Vec, + pub gauges: Vec, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct CounterConfig { + field: Atom, + parse_value: bool, +} + +pub struct LogToMetric { + config: LogToMetricConfig, +} + +#[typetag::serde(name = "log_to_metric")] +impl crate::topology::config::TransformConfig for LogToMetricConfig { + fn build(&self) -> Result, String> { + Ok(Box::new(LogToMetric::new(self.clone()))) + } + + fn input_type(&self) -> crate::topology::config::DataType { + crate::topology::config::DataType::Log + } + + fn output_type(&self) -> crate::topology::config::DataType { + crate::topology::config::DataType::Metric + } +} + +impl LogToMetric { + pub fn new(config: LogToMetricConfig) -> Self { + LogToMetric { config } + } +} + +impl Transform for LogToMetric { + fn transform(&self, event: Event) -> Option { + let event = event.into_log(); + + for counter in self.config.counters.iter() { + if let Some(val) = event.get(&counter.field) { + if counter.parse_value { + if let Ok(val) = val.to_string_lossy().parse() { + return Some(Event::Metric(Metric::Counter { + name: counter.field.to_string(), + val, + sampling: None, + })); + } else { + trace!("failed to parse counter value"); + return None; + } + } else { + return Some(Event::Metric(Metric::Counter { + name: counter.field.to_string(), + val: 1, + sampling: None, + })); + }; + } + } + + for name in self.config.gauges.iter() { + if let Some(val) = event.get(name) { + if let Ok(val) = val.to_string_lossy().parse() { + return Some(Event::Metric(Metric::Gauge { + name: name.to_string(), + val, + direction: None, + })); + } else { + trace!("failed to parse gauge value"); + return None; + } + } + } + + None + } +} + +#[cfg(test)] +mod tests { + use super::{CounterConfig, LogToMetric, LogToMetricConfig}; + use crate::{event::metric::Metric, transforms::Transform, Event}; + + fn config() -> LogToMetricConfig { + LogToMetricConfig { + counters: vec![ + CounterConfig { + field: "foo".into(), + parse_value: true, + }, + CounterConfig { + field: "bar".into(), + parse_value: false, + }, + ], + gauges: vec!["baz".into()], + } + } + + #[test] + fn counter_with_parsing() { + let mut log = Event::from("i am a log"); + log.as_mut_log().insert_explicit("foo".into(), "42".into()); + + let transform = LogToMetric::new(config()); + + let metric = transform.transform(log).unwrap(); + assert_eq!( + metric.into_metric(), + Metric::Counter { + name: "foo".into(), + val: 42, + sampling: None + } + ); + } + + #[test] + fn counter_without_parsing() { + let mut log = Event::from("i am a log"); + log.as_mut_log() + .insert_explicit("bar".into(), "nineteen".into()); + + let transform = LogToMetric::new(config()); + + let metric = transform.transform(log).unwrap(); + assert_eq!( + metric.into_metric(), + Metric::Counter { + name: "bar".into(), + val: 1, + sampling: None + } + ); + } + + #[test] + fn gauge() { + let mut log = Event::from("i am a log"); + log.as_mut_log().insert_explicit("baz".into(), "666".into()); + + let transform = LogToMetric::new(config()); + + let metric = transform.transform(log).unwrap(); + assert_eq!( + metric.into_metric(), + Metric::Gauge { + name: "baz".into(), + val: 666, + direction: None, + } + ); + } + + #[test] + fn parse_failure() { + let mut log = Event::from("i am a log"); + log.as_mut_log() + .insert_explicit("foo".into(), "not a number".into()); + + let transform = LogToMetric::new(config()); + assert_eq!(None, transform.transform(log)); + } + + #[test] + fn missing_field() { + let mut log = Event::from("i am a log"); + log.as_mut_log() + .insert_explicit("not foo".into(), "not a number".into()); + + let transform = LogToMetric::new(config()); + assert_eq!(None, transform.transform(log)); + } +}