Skip to content

Commit

Permalink
Increase DogStatsD Buffer Size and Pattern Match Container Ids (#698)
Browse files Browse the repository at this point in the history
* show dogstatsd logs in serverless mini agent

* increase buffer size to 8192 bytes

* update metric pattern to exclude service checks and to account for container ids

* exclude events from being parsed

* add comment for 8KB max buffer size matching default value in Go Agent

* lazily initialize static regex for dogstatsd metrics

* minor refactors

* add unit tests

* remove explicit drops
  • Loading branch information
duncanpharvey authored Oct 30, 2024
1 parent 490a276 commit 214f046
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 40 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dogstatsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ hashbrown = { version = "0.14.3", default-features = false, features = ["inline-
protobuf = { version = "3.5.0", default-features = false }
ustr = { version = "1.0.0", default-features = false }
fnv = { version = "1.0.7", default-features = false }
lazy_static = { version = "1.5.0", default-features = false }
reqwest = { version = "0.12.4", features = ["json", "http2", "rustls-tls"], default-features = false }
serde = { version = "1.0.197", default-features = false, features = ["derive"] }
serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] }
Expand All @@ -27,3 +28,4 @@ regex = { version = "1.10.6", default-features = false }
[dev-dependencies]
mockito = { version = "1.5.0", default-features = false }
proptest = "1.4.0"
tracing-test = { version = "0.2.5", default-features = false }
31 changes: 29 additions & 2 deletions dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ impl BufferReader {
match self {
BufferReader::UdpSocketReader(socket) => {
// TODO(astuyve) this should be dynamic
let mut buf = [0; 1024]; // todo, do we want to make this dynamic? (not sure)
// Max buffer size is configurable in Go Agent and the default is 8KB
// https://github.com/DataDog/datadog-agent/blob/85939a62b5580b2a15549f6936f257e61c5aa153/pkg/config/config_template.yaml#L2154-L2158
let mut buf = [0; 8192];
let (amt, src) = socket
.recv_from(&mut buf)
.await
Expand Down Expand Up @@ -85,7 +87,7 @@ impl DogStatsD {

fn insert_metrics(&self, msg: Split<char>) {
let all_valid_metrics: Vec<Metric> = msg
.filter(|m| !m.is_empty())
.filter(|m| !m.is_empty() && !m.starts_with("_sc|") && !m.starts_with("_e{")) // exclude empty messages, service checks, and events
.map(|m| m.replace('\n', ""))
.filter_map(|m| match parse(m.as_str()) {
Ok(metric) => Some(metric),
Expand Down Expand Up @@ -114,6 +116,7 @@ mod tests {
use crate::metric::EMPTY_TAGS;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use tracing_test::traced_test;

#[tokio::test]
#[cfg_attr(miri, ignore)]
Expand Down Expand Up @@ -184,6 +187,30 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
assert_value(&locked_aggregator, "metric123", 99_123.0, "");
}

#[tokio::test]
#[traced_test]
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_filter_service_check() {
let locked_aggregator = setup_dogstatsd("_sc|servicecheck|0").await;
let aggregator = locked_aggregator.lock().expect("lock poisoned");
let parsed_metrics = aggregator.to_series();

assert!(!logs_contain("Failed to parse metric"));
assert_eq!(parsed_metrics.len(), 0);
}

#[tokio::test]
#[traced_test]
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_filter_event() {
let locked_aggregator = setup_dogstatsd("_e{5,10}:event|test event").await;
let aggregator = locked_aggregator.lock().expect("lock poisoned");
let parsed_metrics = aggregator.to_series();

assert!(!logs_contain("Failed to parse metric"));
assert_eq!(parsed_metrics.len(), 0);
}

async fn setup_dogstatsd(statsd_string: &str) -> Arc<Mutex<Aggregator>> {
let aggregator_arc = Arc::new(Mutex::new(
Aggregator::new(EMPTY_TAGS, 1_024).expect("aggregator creation failed"),
Expand Down
77 changes: 43 additions & 34 deletions dogstatsd/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@ use crate::errors::ParseError;
use crate::{constants, datadog};
use ddsketch_agent::DDSketch;
use fnv::FnvHasher;
use lazy_static::lazy_static;
use protobuf::Chars;
use regex::Regex;
use std::hash::{Hash, Hasher};
use ustr::Ustr;

pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() };

lazy_static! {
static ref METRIC_REGEX: regex::Regex = Regex::new(
r"^(?P<name>[^:]+):(?P<values>[^|]+)\|(?P<type>[cgd])(?:\|@(?P<sample_rate>[\d.]+))?(?:\|#(?P<tags>[^|]+))?(?:\|c:(?P<container_id>[^|]+))?$",
).expect("Failed to create metric regex");
}

#[derive(Clone, Debug)]
pub enum MetricValue {
/// Dogstatsd 'count' metric type, monotonically increasing counter
Expand Down Expand Up @@ -167,41 +174,37 @@ impl Metric {
/// example aj-test.increment:1|c|#user:aj-test from 127.0.0.1:50983
pub fn parse(input: &str) -> Result<Metric, ParseError> {
// TODO must enforce / exploit constraints given in `constants`.
if let Ok(re) = Regex::new(
r"^(?P<name>[^:]+):(?P<values>[^|]+)\|(?P<type>[cgd])(?:\|@(?P<sample_rate>[\d.]+))?(?:\|#(?P<tags>[^|]+))?$",
) {
if let Some(caps) = re.captures(input) {
// unused for now
// let sample_rate = caps.name("sample_rate").map(|m| m.as_str());

let tags;
if let Some(tags_section) = caps.name("tags") {
tags = Some(SortedTags::parse(tags_section.as_str())?);
} else {
tags = None;
}
let val = first_value(caps.name("values").unwrap().as_str())?;
let metric_value = match caps.name("type").unwrap().as_str() {
"c" => MetricValue::Count(val),
"g" => MetricValue::Gauge(val),
"d" => {
let sketch = &mut DDSketch::default();
sketch.insert(val);
MetricValue::Distribution(sketch.to_owned())
}
_ => {
return Err(ParseError::Raw("Unsupported metric type"));
}
};
let name = Ustr::from(caps.name("name").unwrap().as_str());
let id = id(name, &tags);
return Ok(Metric {
name,
value: metric_value,
tags,
id,
});
if let Some(caps) = METRIC_REGEX.captures(input) {
// unused for now
// let sample_rate = caps.name("sample_rate").map(|m| m.as_str());

let tags;
if let Some(tags_section) = caps.name("tags") {
tags = Some(SortedTags::parse(tags_section.as_str())?);
} else {
tags = None;
}
let val = first_value(caps.name("values").unwrap().as_str())?;
let metric_value = match caps.name("type").unwrap().as_str() {
"c" => MetricValue::Count(val),
"g" => MetricValue::Gauge(val),
"d" => {
let sketch = &mut DDSketch::default();
sketch.insert(val);
MetricValue::Distribution(sketch.to_owned())
}
_ => {
return Err(ParseError::Raw("Unsupported metric type"));
}
};
let name = Ustr::from(caps.name("name").unwrap().as_str());
let id = id(name, &tags);
return Ok(Metric {
name,
value: metric_value,
tags,
id,
});
}
Err(ParseError::Raw("Invalid metric format"))
}
Expand Down Expand Up @@ -468,4 +471,10 @@ mod tests {
fn invalid_dogstatsd_no_panic() {
assert!(parse("somerandomstring|c+a;slda").is_err());
}

#[test]
#[cfg_attr(miri, ignore)]
fn parse_container_id() {
assert!(parse("containerid.metric:0|c|#env:dev,client_transport:udp|c:0000000000000000000000000000000000000000000000000000000000000000").is_ok());
}
}
3 changes: 3 additions & 0 deletions serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ datadog-trace-utils = { path = "../trace-utils" }
dogstatsd = { path = "../dogstatsd" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"]}
tokio-util = { version = "0.7", default-features = false }
tracing = { version = "0.1", default-features = false }
tracing-core = { version = "0.1", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "registry", "fmt", "env-filter", "tracing-log"] }

[[bin]]
name = "datadog-serverless-trace-mini-agent"
Expand Down
31 changes: 27 additions & 4 deletions serverless/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use env_logger::{Builder, Env, Target};
use env_logger::Builder;
use log::{debug, error, info};
use std::{env, sync::Arc, sync::Mutex};
use std::{env, str::FromStr, sync::Arc, sync::Mutex};
use tokio::time::{interval, Duration};
use tracing_subscriber::EnvFilter;

use datadog_trace_mini_agent::{
config, env_verifier, mini_agent, stats_flusher, stats_processor, trace_flusher,
Expand All @@ -27,8 +28,11 @@ const AGENT_HOST: &str = "0.0.0.0";

#[tokio::main]
pub async fn main() {
let env = Env::new().filter_or("DD_LOG_LEVEL", "info");
Builder::from_env(env).target(Target::Stdout).init();
let log_level = env::var("DD_LOG_LEVEL")
.map(|val| val.to_lowercase())
.unwrap_or("info".to_string());
let level_filter = log::LevelFilter::from_str(&log_level).unwrap_or(log::LevelFilter::Info);
Builder::new().filter_level(level_filter).init();

let dd_api_key: Option<String> = env::var("DD_API_KEY").ok();
let dd_dogstatsd_port: u16 = env::var("DD_DOGSTATSD_PORT")
Expand All @@ -48,6 +52,25 @@ pub async fn main() {
let mini_agent_version = env!("CARGO_PKG_VERSION").to_string();
env::set_var("DD_MINI_AGENT_VERSION", mini_agent_version);

let env_filter = format!("h2=off,hyper=off,rustls=off,{}", log_level);

let subscriber = tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(
EnvFilter::try_new(env_filter).expect("could not parse log level in configuration"),
)
.with_level(true)
.with_thread_names(false)
.with_thread_ids(false)
.with_line_number(false)
.with_file(false)
.with_target(false)
.without_time()
.finish();

tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");

debug!("Logging subsystem enabled");

let env_verifier = Arc::new(env_verifier::ServerlessEnvVerifier::default());

let trace_flusher = Arc::new(trace_flusher::ServerlessTraceFlusher {});
Expand Down

0 comments on commit 214f046

Please sign in to comment.