diff --git a/Cargo.toml b/Cargo.toml index 805cc1ecefb48..a0567530629cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -912,7 +912,8 @@ component-validation-tests = [ "sources-http_server", "sinks-http", "sinks-splunk_hec", - "sources-splunk_hec" + "sources-splunk_hec", + "sinks-datadog_logs", ] # Grouping together features for benchmarks. We exclude the API client due to it causing the build process to run out diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 76f88f2c03297..704444ded4c21 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -521,6 +521,10 @@ fn spawn_component_topology( let mut config = config_builder .build() .expect("config should not have any errors"); + + // It's possible we could extend the framework to allow specifying logic to + // handle that, but I don't see much value currently since the healthcheck is + // not enforced for components, and it doesn't impact the internal telemetry. config.healthchecks.enabled = false; _ = std::thread::spawn(move || { diff --git a/src/sinks/datadog/logs/config.rs b/src/sinks/datadog/logs/config.rs index 0163b9f6633a0..fe2f5f8306f44 100644 --- a/src/sinks/datadog/logs/config.rs +++ b/src/sinks/datadog/logs/config.rs @@ -2,28 +2,30 @@ use std::{convert::TryFrom, sync::Arc}; use indoc::indoc; use tower::ServiceBuilder; -use vector_lib::configurable::configurable_component; -use vector_lib::{config::proxy::ProxyConfig, schema::meaning}; + +use vector_lib::{ + codecs::{JsonSerializerConfig, MetricTagValues}, + config::proxy::ProxyConfig, + configurable::configurable_component, + schema::meaning, +}; use vrl::value::Kind; -use super::{service::LogApiRetry, sink::LogSinkBuilder}; -use crate::common::datadog; use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, + codecs::EncodingConfigWithFraming, + common::datadog, http::HttpClient, schema, sinks::{ datadog::{logs::service::LogApiService, DatadogCommonConfig, LocalDatadogCommonConfig}, - util::{ - http::RequestConfig, service::ServiceBuilderExt, BatchConfig, Compression, - SinkBatchSettings, - }, - Healthcheck, VectorSink, + prelude::*, + util::http::RequestConfig, }, tls::{MaybeTlsSettings, TlsEnableableConfig}, }; +use super::{service::LogApiRetry, sink::LogSinkBuilder}; + // The Datadog API has a hard limit of 5MB for uncompressed payloads. Above this // threshold the API will toss results. We previously serialized Events as they // came in -- a very CPU intensive process -- and to avoid that we only batch up @@ -182,6 +184,48 @@ impl SinkConfig for DatadogLogsConfig { } } +impl ValidatableComponent for DatadogLogsConfig { + fn validation_configuration() -> ValidationConfiguration { + let endpoint = "http://127.0.0.1:9005".to_string(); + let config = Self { + local_dd_common: LocalDatadogCommonConfig { + endpoint: Some(endpoint.clone()), + default_api_key: Some("unused".to_string().into()), + ..Default::default() + }, + ..Default::default() + }; + + let encoding = EncodingConfigWithFraming::new( + None, + JsonSerializerConfig::new(MetricTagValues::Full).into(), + config.encoding.clone(), + ); + + let logs_endpoint = format!("{endpoint}/api/v2/logs"); + + let external_resource = ExternalResource::new( + ResourceDirection::Push, + HttpResourceConfig::from_parts( + http::Uri::try_from(&logs_endpoint).expect("should not fail to parse URI"), + None, + ), + encoding, + ); + + ValidationConfiguration::from_sink( + Self::NAME, + vec![ComponentTestCaseConfig::from_sink( + config, + None, + Some(external_resource), + )], + ) + } +} + +register_validatable_component!(DatadogLogsConfig); + #[cfg(test)] mod test { use super::super::config::DatadogLogsConfig; diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 0466917cbdd1b..ae399b6e9601b 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -14,7 +14,6 @@ use crate::{ codecs::{Decoder, DecodingConfig}, config::{SourceConfig, SourceContext}, http::Auth, - register_validatable_component, serde::{default_decoding, default_framing_message_based}, sources, sources::util::{ @@ -27,7 +26,7 @@ use crate::{ tls::{TlsConfig, TlsSettings}, Result, }; -use crate::{components::validation::*, sources::util::http_client}; +use crate::{components::validation::prelude::*, sources::util::http_client}; use vector_lib::codecs::{ decoding::{DeserializerConfig, FramingConfig}, StreamDecodingError, diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 255c4231d7bfd..32a1961348376 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -22,14 +22,13 @@ use vector_lib::{ use crate::{ codecs::{Decoder, DecodingConfig}, - components::validation::*, + components::validation::prelude::*, config::{ GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput, }, event::{Event, Value}, http::KeepaliveConfig, - register_validatable_component, serde::{bool_or_struct, default_decoding}, sources::util::{ http::{add_query_parameters, HttpMethod}, diff --git a/tests/validation/components/sinks/datadog_logs.yaml b/tests/validation/components/sinks/datadog_logs.yaml new file mode 100644 index 0000000000000..cf0f912d160a2 --- /dev/null +++ b/tests/validation/components/sinks/datadog_logs.yaml @@ -0,0 +1,11 @@ +- name: happy path + expectation: success + events: + - log: simple message 1 + - log: simple message 2 + - log: simple message 3 +- name: sad path + expectation: failure + events: + - external_resource_rejects: + log: simple message downstream rejects