Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::{
sources::util::{
http::HttpMethod,
http_client::{
build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
build_url, call, default_interval, default_timeout, warn_if_interval_too_low,
GenericHttpClientInputs, HttpClientBuilder,
},
},
tls::{TlsConfig, TlsSettings},
Expand Down Expand Up @@ -51,13 +52,22 @@ pub struct HttpClientConfig {
#[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
pub endpoint: String,

/// The interval between calls.
/// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
/// than the interval a new scrape will be started. This can take extra resources, set the timeout
/// to a value lower than the scrape interval to prevent this from happening.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
pub interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
pub timeout: Duration,

/// Custom parameters for the HTTP request query string.
///
/// One or more values for the same parameter key can be provided.
Expand Down Expand Up @@ -153,6 +163,7 @@ impl Default for HttpClientConfig {
endpoint: "http://localhost:9898/logs".to_string(),
query: HashMap::new(),
interval: default_interval(),
timeout: default_timeout(),
decoding: default_decoding(),
framing: default_framing_message_based(),
headers: HashMap::new(),
Expand Down Expand Up @@ -193,9 +204,12 @@ impl SourceConfig for HttpClientConfig {
log_namespace,
};

warn_if_interval_too_low(self.timeout, self.interval);

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: self.headers.clone(),
content_type,
auth: self.auth.clone(),
Expand Down
13 changes: 12 additions & 1 deletion src/sources/http_client/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use codecs::decoding::DeserializerConfig;
use vector_core::config::log_schema;

use super::{
tests::{run_compliance, INTERVAL},
tests::{run_compliance, INTERVAL, TIMEOUT},
HttpClientConfig,
};

Expand Down Expand Up @@ -53,6 +53,7 @@ async fn invalid_endpoint() {
run_error(HttpClientConfig {
endpoint: "http://nope".to_string(),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand All @@ -71,6 +72,7 @@ async fn collected_logs_bytes() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/bytes", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Bytes,
framing: default_framing_message_based(),
Expand All @@ -95,6 +97,7 @@ async fn collected_logs_json() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -119,6 +122,7 @@ async fn collected_metrics_native_json() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/metrics/native.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::NativeJson(Default::default()),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -148,6 +152,7 @@ async fn collected_trace_native_json() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("{}/traces/native.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::NativeJson(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -172,6 +177,7 @@ async fn unauthorized_no_auth() {
run_error(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_auth_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -190,6 +196,7 @@ async fn unauthorized_wrong_auth() {
run_error(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_auth_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -211,6 +218,7 @@ async fn authorized() {
run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_auth_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -232,6 +240,7 @@ async fn tls_invalid_ca() {
run_error(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_https_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -253,6 +262,7 @@ async fn tls_valid() {
run_compliance(HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_https_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand All @@ -275,6 +285,7 @@ async fn shutdown() {
let source = HttpClientConfig {
endpoint: format!("{}/logs/json.json", dufs_address()),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: default_framing_message_based(),
Expand Down
8 changes: 8 additions & 0 deletions src/sources/http_client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::test_util::{

pub(crate) const INTERVAL: Duration = Duration::from_secs(1);

pub(crate) const TIMEOUT: Duration = Duration::from_secs(1);

/// The happy path should yield at least one event and must emit the required internal events for sources.
pub(crate) async fn run_compliance(config: HttpClientConfig) -> Vec<Event> {
let events =
Expand Down Expand Up @@ -47,6 +49,7 @@ async fn bytes_decoding() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -75,6 +78,7 @@ async fn json_decoding_newline_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::NewlineDelimited(Default::default()),
Expand Down Expand Up @@ -103,6 +107,7 @@ async fn json_decoding_character_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
Expand Down Expand Up @@ -135,6 +140,7 @@ async fn request_query_applied() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint?key1=val1", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
Expand Down Expand Up @@ -203,6 +209,7 @@ async fn headers_applied() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -234,6 +241,7 @@ async fn accept_header_override() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
timeout: TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Bytes,
framing: default_framing_message_based(),
Expand Down
23 changes: 22 additions & 1 deletion src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vector_core::{config::LogNamespace, event::Event};

use super::parser;
use crate::sources::util::http::HttpMethod;
use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
use crate::{
config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
http::Auth,
Expand Down Expand Up @@ -53,13 +54,22 @@ pub struct PrometheusScrapeConfig {
#[serde(alias = "hosts")]
endpoints: Vec<String>,

/// The interval between scrapes, in seconds.
/// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
/// than the interval a new scrape will be started. This can take extra resources, set the timeout
/// to a value lower than the scrape interval to prevent this from happening.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
timeout: Duration,

/// The tag name added to each event representing the scraped instance's `host:port`.
///
/// The tag value is the host and port of the scraped instance.
Expand Down Expand Up @@ -114,6 +124,7 @@ impl GenerateConfig for PrometheusScrapeConfig {
toml::Value::try_from(Self {
endpoints: vec!["http://localhost:9090/metrics".to_string()],
interval: default_interval(),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -143,9 +154,12 @@ impl SourceConfig for PrometheusScrapeConfig {
endpoint_tag: self.endpoint_tag.clone(),
};

warn_if_interval_too_low(self.timeout, self.interval);

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: HashMap::new(),
content_type: "text/plain".to_string(),
auth: self.auth.clone(),
Expand Down Expand Up @@ -351,6 +365,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -384,6 +399,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -435,6 +451,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -500,6 +517,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -555,6 +573,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -668,6 +687,7 @@ mod test {
honor_labels: false,
query: HashMap::new(),
interval: Duration::from_secs(1),
timeout: default_timeout(),
tls: None,
auth: None,
},
Expand Down Expand Up @@ -753,6 +773,7 @@ mod integration_tests {
let config = PrometheusScrapeConfig {
endpoints: vec!["http://prometheus:9090/metrics".into()],
interval: Duration::from_secs(1),
timeout: Duration::from_secs(1),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down
Loading