Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4ef88f3
Add datadog section
StephenWakely Oct 18, 2023
8a1e9f1
Send global datadog options to sinks
StephenWakely Oct 19, 2023
202b87f
Fix tests
StephenWakely Oct 23, 2023
11c5deb
Added a test
StephenWakely Oct 25, 2023
ce39ea6
Update docs
StephenWakely Oct 25, 2023
6c7785f
Remove source feature
StephenWakely Oct 25, 2023
a1397bb
Merge remote-tracking branch 'origin' into stephen/dd_global
StephenWakely Oct 25, 2023
dea6fc8
Test local setting can override the global one
StephenWakely Oct 25, 2023
6ff4a8b
Spelling
StephenWakely Oct 25, 2023
cc1c656
Update docs
StephenWakely Oct 26, 2023
43767a6
Feedback from Doug
StephenWakely Oct 26, 2023
427d966
Merge remote-tracking branch 'origin' into OPW-43/dd_global
StephenWakely Oct 26, 2023
deeee6f
Allow datadog options to be specified by passing in the RootOpts
StephenWakely Oct 27, 2023
53c581b
Merge remote-tracking branch 'origin' into OPW-43/dd_global
StephenWakely Oct 27, 2023
86badbe
Pass datadog options through a new extra_context parameter
StephenWakely Nov 13, 2023
03a21df
Merge remote-tracking branch 'origin' into OPW-43/dd_global
StephenWakely Nov 13, 2023
7649fce
Fix extra context
StephenWakely Nov 15, 2023
e9df5ed
Tidy up a little
StephenWakely Nov 15, 2023
a16f9b4
Merge remote-tracking branch 'origin' into OPW-43/dd_global
StephenWakely Nov 15, 2023
552bc9d
Make Datadog options pub
StephenWakely Nov 16, 2023
db210a4
Updated licenses
StephenWakely Nov 17, 2023
c2837fe
Add anymap to spellings
StephenWakely Nov 17, 2023
994d8e7
Add to upgrade guide
StephenWakely Nov 20, 2023
b93667c
Feedback from Doug
StephenWakely Nov 20, 2023
2c4efd0
Spelling
StephenWakely Nov 20, 2023
5b42e79
Feedback from Bryce
StephenWakely Nov 21, 2023
d233c51
Component docs
StephenWakely Nov 21, 2023
f3684bb
Feedback from Bruce
StephenWakely Nov 27, 2023
a40d9bc
Use single_value in tests
StephenWakely Nov 29, 2023
8e67da9
Rename dd_common to local_dd_common
StephenWakely Nov 29, 2023
c33e900
Pass default for extra context when running as a Windows service
StephenWakely Dec 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ andy
ansicpg
anumber
anycondition
anymap
anypb
apievent
apipodspec
Expand Down
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ sha2 = { version = "0.10.8", default-features = false, optional = true }
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true }

# External libs
anymap = { version = "0.12", default-features = false }
arc-swap = { version = "1.6", default-features = false, optional = true }
async-compression = { version = "0.4.4", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true }
apache-avro = { version = "0.16.0", default-features = false, optional = true }
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anstyle-query,https://github.com/rust-cli/anstyle,MIT OR Apache-2.0,The anstyle-
anstyle-wincon,https://github.com/rust-cli/anstyle,MIT OR Apache-2.0,The anstyle-wincon Authors
anyhow,https://github.com/dtolnay/anyhow,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
anymap,https://github.com/chris-morgan/anymap,BlueOak-1.0.0 OR MIT OR Apache-2.0,Chris Morgan <rust@chrismorgan.info>
anymap,https://github.com/chris-morgan/anymap,MIT OR Apache-2.0,Chris Morgan <me@chrismorgan.info>
apache-avro,https://github.com/apache/avro,Apache-2.0,Apache Avro team <dev@avro.apache.org>
arbitrary,https://github.com/rust-fuzz/arbitrary,MIT OR Apache-2.0,"The Rust-Fuzz Project Developers, Nick Fitzgerald <fitzgen@gmail.com>, Manish Goregaokar <manishsmail@gmail.com>, Simonas Kazlauskas <arbitrary@kazlauskas.me>, Brian L. Troutwine <brian@troutwine.us>, Corey Farwell <coreyf@rwell.org>"
arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Vaner <vorner@vorner.cz>
Expand Down
46 changes: 33 additions & 13 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::config::enterprise::{
attach_enterprise_components, report_configuration, EnterpriseError, EnterpriseMetadata,
EnterpriseReporter,
};
use crate::extra_context::ExtraContext;
#[cfg(not(feature = "enterprise-tests"))]
use crate::metrics;
#[cfg(feature = "api")]
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct ApplicationConfig {
pub api: config::api::Options,
#[cfg(feature = "enterprise")]
pub enterprise: Option<EnterpriseReporter<BoxFuture<'static, ()>>>,
pub extra_context: ExtraContext,
}

pub struct Application {
Expand All @@ -61,6 +63,7 @@ impl ApplicationConfig {
pub async fn from_opts(
opts: &RootOpts,
signal_handler: &mut SignalHandler,
extra_context: ExtraContext,
) -> Result<Self, ExitCode> {
let config_paths = opts.config_paths_with_formats();

Expand All @@ -77,12 +80,13 @@ impl ApplicationConfig {
)
.await?;

Self::from_config(config_paths, config).await
Self::from_config(config_paths, config, extra_context).await
}

pub async fn from_config(
config_paths: Vec<ConfigPath>,
config: Config,
extra_context: ExtraContext,
) -> Result<Self, ExitCode> {
// This is ugly, but needed to allow `config` to be mutable for building the enterprise
// features, but also avoid a "does not need to be mutable" warning when the enterprise
Expand All @@ -95,9 +99,10 @@ impl ApplicationConfig {
#[cfg(feature = "api")]
let api = config.api;

let (topology, graceful_crash_receiver) = RunningTopology::start_init_validated(config)
.await
.ok_or(exitcode::CONFIG)?;
let (topology, graceful_crash_receiver) =
RunningTopology::start_init_validated(config, extra_context.clone())
.await
.ok_or(exitcode::CONFIG)?;

Ok(Self {
config_paths,
Expand All @@ -108,11 +113,18 @@ impl ApplicationConfig {
api,
#[cfg(feature = "enterprise")]
enterprise,
extra_context,
})
}

pub async fn add_internal_config(&mut self, config: Config) -> Result<(), ExitCode> {
let Some((topology, _)) = RunningTopology::start_init_validated(config).await else {
pub async fn add_internal_config(
&mut self,
config: Config,
extra_context: ExtraContext,
) -> Result<(), ExitCode> {
let Some((topology, _)) =
RunningTopology::start_init_validated(config, extra_context).await
else {
return Err(exitcode::CONFIG);
};
self.internal_topologies.push(topology);
Expand Down Expand Up @@ -155,28 +167,34 @@ impl ApplicationConfig {
}

impl Application {
pub fn run() -> ExitStatus {
let (runtime, app) = Self::prepare_start().unwrap_or_else(|code| std::process::exit(code));
pub fn run(extra_context: ExtraContext) -> ExitStatus {
let (runtime, app) =
Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));

runtime.block_on(app.run())
}

pub fn prepare_start() -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare()
pub fn prepare_start(
extra_context: ExtraContext,
) -> Result<(Runtime, StartedApplication), ExitCode> {
Self::prepare(extra_context)
.and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
}

pub fn prepare() -> Result<(Runtime, Self), ExitCode> {
pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
let opts = Opts::get_matches().map_err(|error| {
// Printing to stdout/err can itself fail; ignore it.
_ = error.print();
exitcode::USAGE
})?;

Self::prepare_from_opts(opts)
Self::prepare_from_opts(opts, extra_context)
}

pub fn prepare_from_opts(opts: Opts) -> Result<(Runtime, Self), ExitCode> {
pub fn prepare_from_opts(
opts: Opts,
extra_context: ExtraContext,
) -> Result<(Runtime, Self), ExitCode> {
init_global(!opts.root.openssl_no_probe);

let color = opts.root.color.use_color();
Expand Down Expand Up @@ -205,6 +223,7 @@ impl Application {
let config = runtime.block_on(ApplicationConfig::from_opts(
&opts.root,
&mut signals.handler,
extra_context.clone(),
))?;

Ok((
Expand Down Expand Up @@ -239,6 +258,7 @@ impl Application {
require_healthy: root_opts.require_healthy,
#[cfg(feature = "enterprise")]
enterprise_reporter: config.enterprise,
extra_context: config.extra_context,
});

Ok(StartedApplication {
Expand Down
32 changes: 29 additions & 3 deletions src/common/datadog.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Functionality shared between Datadog sources and sinks.
// Allow unused imports here, since use of these functions will differ depending on the
// Datadog component type, whether it's used in integration tests, etc.
#![allow(dead_code)]
#![allow(unreachable_pub)]
use serde::{Deserialize, Serialize};
use vector_lib::event::DatadogMetricOriginMetadata;
use vector_lib::{event::DatadogMetricOriginMetadata, sensitive_string::SensitiveString};

pub const DD_US_SITE: &str = "datadoghq.com";
pub const DD_EU_SITE: &str = "datadoghq.eu";
pub(crate) const DD_US_SITE: &str = "datadoghq.com";
pub(crate) const DD_EU_SITE: &str = "datadoghq.eu";

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub(crate) struct DatadogSeriesMetric {
Expand Down Expand Up @@ -50,3 +51,28 @@ pub(crate) fn get_api_base_endpoint(endpoint: Option<&String>, site: &str) -> St
.cloned()
.unwrap_or_else(|| format!("https://api.{}", site))
}

/// Default settings to use for Datadog components.
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
pub struct Options {
/// Default Datadog API key to use for Datadog components.
///
/// This can also be specified with the `DD_API_KEY` environment variable.
#[derivative(Default(value = "default_api_key()"))]
pub api_key: Option<SensitiveString>,

/// Default site to use for Datadog components.
///
/// This can also be specified with the `DD_SITE` environment variable.
#[derivative(Default(value = "default_site()"))]
pub site: String,
}

fn default_api_key() -> Option<SensitiveString> {
std::env::var("DD_API_KEY").ok().map(Into::into)
}

pub(crate) fn default_site() -> String {
std::env::var("DD_SITE").unwrap_or(DD_US_SITE.to_string())
}
3 changes: 2 additions & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Modules that are common between sources and sinks.
#[cfg(any(
feature = "sources-datadog_agent",
feature = "sinks-datadog_events",
Expand All @@ -6,7 +7,7 @@
feature = "sinks-datadog_traces",
feature = "enterprise"
))]
pub(crate) mod datadog;
pub mod datadog;

#[cfg(any(
feature = "sources-aws_sqs",
Expand Down
15 changes: 13 additions & 2 deletions src/components/validation/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
codecs::Encoder,
components::validation::{RunnerMetrics, TestCase},
config::ConfigBuilder,
extra_context::ExtraContext,
topology::RunningTopology,
};

Expand Down Expand Up @@ -154,17 +155,20 @@ pub struct Runner {
configuration: ValidationConfiguration,
test_case_data_path: PathBuf,
validators: HashMap<String, Box<dyn Validator>>,
extra_context: ExtraContext,
}

impl Runner {
pub fn from_configuration(
configuration: ValidationConfiguration,
test_case_data_path: PathBuf,
extra_context: ExtraContext,
) -> Self {
Self {
configuration,
test_case_data_path,
validators: HashMap::new(),
extra_context,
}
}

Expand Down Expand Up @@ -271,7 +275,11 @@ impl Runner {
// At this point, we need to actually spawn the configured component topology so that it
// runs, and make sure we have a way to tell it when to shutdown so that we can properly
// sequence the test in terms of sending inputs, waiting for outputs, etc.
spawn_component_topology(config_builder, &topology_task_coordinator);
spawn_component_topology(
config_builder,
&topology_task_coordinator,
self.extra_context.clone(),
);
let topology_task_coordinator = topology_task_coordinator.started().await;

// Now we'll spawn two tasks: one for sending inputs, and one for collecting outputs.
Expand Down Expand Up @@ -462,6 +470,7 @@ fn build_external_resource(
fn spawn_component_topology(
config_builder: ConfigBuilder,
topology_task_coordinator: &TaskCoordinator<Configuring>,
extra_context: ExtraContext,
) {
let topology_started = topology_task_coordinator.track_started();
let topology_completed = topology_task_coordinator.track_completed();
Expand All @@ -482,7 +491,9 @@ fn spawn_component_topology(
debug!("Building component topology...");

let (topology, mut crash_rx) =
RunningTopology::start_init_validated(config).await.unwrap();
RunningTopology::start_init_validated(config, extra_context)
.await
.unwrap();

debug!("Component topology built and spawned.");
topology_started.mark_as_done();
Expand Down
27 changes: 12 additions & 15 deletions src/config/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ use super::{
SourceOuter, TransformOuter,
};
use crate::{
common::datadog::get_api_base_endpoint,
common::datadog::{default_site, get_api_base_endpoint},
conditions::AnyCondition,
http::{HttpClient, HttpError},
sinks::{
datadog::{
default_site, logs::DatadogLogsConfig, metrics::DatadogMetricsConfig,
DatadogCommonConfig,
logs::DatadogLogsConfig, metrics::DatadogMetricsConfig, LocalDatadogCommonConfig,
},
util::{http::RequestConfig, retries::ExponentialBackoff},
},
Expand Down Expand Up @@ -461,12 +460,11 @@ fn setup_logs_reporting(

// Create a Datadog logs sink to consume and emit internal logs.
let datadog_logs = DatadogLogsConfig {
dd_common: DatadogCommonConfig {
endpoint: datadog.endpoint.clone(),
site: datadog.site.clone(),
default_api_key: api_key.into(),
..Default::default()
},
local_dd_common: LocalDatadogCommonConfig::new(
datadog.endpoint.clone(),
Some(datadog.site.clone()),
Some(api_key.into()),
),
request: RequestConfig {
headers: IndexMap::from([(
"DD-EVP-ORIGIN".to_string(),
Expand Down Expand Up @@ -568,12 +566,11 @@ fn setup_metrics_reporting(

// Create a Datadog metrics sink to consume and emit internal + host metrics.
let datadog_metrics = DatadogMetricsConfig {
dd_common: DatadogCommonConfig {
endpoint: datadog.endpoint.clone(),
site: datadog.site.clone(),
default_api_key: api_key.into(),
..Default::default()
},
local_dd_common: LocalDatadogCommonConfig::new(
datadog.endpoint.clone(),
Some(datadog.site.clone()),
Some(api_key.into()),
),
..Default::default()
};

Expand Down
3 changes: 2 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,8 @@ mod tests {
let c2 = config::load_from_str(config, format).unwrap();
match (
config::warnings(&c2),
topology::TopologyPieces::build(&c, &diff, HashMap::new()).await,
topology::TopologyPieces::build(&c, &diff, HashMap::new(), Default::default())
.await,
) {
(warnings, Ok(_pieces)) => Ok(warnings),
(_, Err(errors)) => Err(errors),
Expand Down
Loading