Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use crate::{
sources::util::{
http::HttpMethod,
http_client::{
build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
build_url, call, default_interval, default_target_timeout, GenericHttpClientInputs,
HttpClientBuilder,
},
},
tls::{TlsConfig, TlsSettings},
Expand Down Expand Up @@ -51,13 +52,20 @@ pub struct HttpClientConfig {
#[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))]
pub endpoint: String,

/// The interval between calls.
/// The interval between scrapes. Requests run concurrently.
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
pub interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_target_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
pub target_timeout: Duration,

/// Custom parameters for the HTTP request query string.
///
/// One or more values for the same parameter key can be provided.
Expand Down Expand Up @@ -153,6 +161,7 @@ impl Default for HttpClientConfig {
endpoint: "http://localhost:9898/logs".to_string(),
query: HashMap::new(),
interval: default_interval(),
target_timeout: default_target_timeout(),
decoding: default_decoding(),
framing: default_framing_message_based(),
headers: HashMap::new(),
Expand Down Expand Up @@ -193,9 +202,18 @@ impl SourceConfig for HttpClientConfig {
log_namespace,
};

if self.target_timeout > self.interval {
Comment thread
nullren marked this conversation as resolved.
Outdated
warn!(
interval_secs = %self.interval.as_secs_f64(),
target_timeout_secs = %self.target_timeout.as_secs_f64(),
message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
);
}

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
target_timeout: self.target_timeout,
headers: self.headers.clone(),
content_type,
auth: self.auth.clone(),
Expand Down
8 changes: 8 additions & 0 deletions src/sources/http_client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::test_util::{

pub(crate) const INTERVAL: Duration = Duration::from_secs(1);

pub(crate) const TARGET_TIMEOUT: Duration = Duration::from_secs(1);

/// The happy path should yield at least one event and must emit the required internal events for sources.
pub(crate) async fn run_compliance(config: HttpClientConfig) -> Vec<Event> {
let events =
Expand Down Expand Up @@ -47,6 +49,7 @@ async fn bytes_decoding() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
target_timeout: TARGET_TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -75,6 +78,7 @@ async fn json_decoding_newline_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
target_timeout: TARGET_TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::NewlineDelimited(Default::default()),
Expand Down Expand Up @@ -103,6 +107,7 @@ async fn json_decoding_character_delimited() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
target_timeout: TARGET_TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Json(Default::default()),
framing: FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
Expand Down Expand Up @@ -135,6 +140,7 @@ async fn request_query_applied() {
let events = run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint?key1=val1", in_addr),
interval: INTERVAL,
target_timeout: TARGET_TIMEOUT,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
Expand Down Expand Up @@ -203,6 +209,7 @@ async fn headers_applied() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
target_timeout: TARGET_TIMEOUT,
query: HashMap::new(),
decoding: default_decoding(),
framing: default_framing_message_based(),
Expand Down Expand Up @@ -234,6 +241,7 @@ async fn accept_header_override() {
run_compliance(HttpClientConfig {
endpoint: format!("http://{}/endpoint", in_addr),
interval: INTERVAL,
target_timeout: TARGET_TIMEOUT,
query: HashMap::new(),
decoding: DeserializerConfig::Bytes,
framing: default_framing_message_based(),
Expand Down
26 changes: 25 additions & 1 deletion src/sources/prometheus/scrape.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use vector_core::{config::LogNamespace, event::Event};

use super::parser;
use crate::sources::util::http::HttpMethod;
use crate::sources::util::http_client::default_target_timeout;
use crate::{
config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
http::Auth,
Expand Down Expand Up @@ -53,13 +54,20 @@ pub struct PrometheusScrapeConfig {
#[serde(alias = "hosts")]
endpoints: Vec<String>,

/// The interval between scrapes, in seconds.
/// The interval between scrapes. Requests run concurrently.
Comment thread
nullren marked this conversation as resolved.
Outdated
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
interval: Duration,

/// The timeout for each scrape request.
#[serde(default = "default_target_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
target_timeout: Duration,

/// The tag name added to each event representing the scraped instance's `host:port`.
///
/// The tag value is the host and port of the scraped instance.
Expand Down Expand Up @@ -114,6 +122,7 @@ impl GenerateConfig for PrometheusScrapeConfig {
toml::Value::try_from(Self {
endpoints: vec!["http://localhost:9090/metrics".to_string()],
interval: default_interval(),
target_timeout: default_target_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -143,9 +152,18 @@ impl SourceConfig for PrometheusScrapeConfig {
endpoint_tag: self.endpoint_tag.clone(),
};

if self.target_timeout > self.interval {
warn!(
interval_secs = %self.interval.as_secs_f64(),
target_timeout_secs = %self.target_timeout.as_secs_f64(),
message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.",
);
}
Comment thread
nullren marked this conversation as resolved.
Outdated

let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
target_timeout: self.target_timeout,
headers: HashMap::new(),
content_type: "text/plain".to_string(),
auth: self.auth.clone(),
Expand Down Expand Up @@ -351,6 +369,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
target_timeout: default_target_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -384,6 +403,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
target_timeout: default_target_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -435,6 +455,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
target_timeout: default_target_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -500,6 +521,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
target_timeout: default_target_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
Expand Down Expand Up @@ -555,6 +577,7 @@ mod test {
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
interval: Duration::from_secs(1),
target_timeout: default_target_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
Expand Down Expand Up @@ -668,6 +691,7 @@ mod test {
honor_labels: false,
query: HashMap::new(),
interval: Duration::from_secs(1),
target_timeout: default_target_timeout(),
tls: None,
auth: None,
},
Expand Down
36 changes: 27 additions & 9 deletions src/sources/util/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
},
sources::util::http::HttpMethod,
tls::TlsSettings,
Error, SourceSender,
SourceSender,
};
use vector_common::shutdown::ShutdownSignal;
use vector_core::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf};
Expand All @@ -36,6 +36,8 @@ pub(crate) struct GenericHttpClientInputs {
pub urls: Vec<Uri>,
/// Interval between calls.
pub interval: Duration,
/// Timeout for the HTTP request.
pub target_timeout: Duration,
/// Map of Header+Value to apply to HTTP request.
pub headers: HashMap<String, Vec<String>>,
/// Content type of the HTTP request, determined by the source.
Expand All @@ -51,6 +53,11 @@ pub(crate) const fn default_interval() -> Duration {
Duration::from_secs(15)
}

/// The default timeout for the HTTP request if none is configured.
pub(crate) const fn default_target_timeout() -> Duration {
Duration::from_secs(5)
}

/// Builds the context, allowing the source-specific implementation to leverage data from the
/// config and the current HTTP request.
pub(crate) trait HttpClientBuilder {
Expand Down Expand Up @@ -114,15 +121,16 @@ pub(crate) async fn call<
mut out: SourceSender,
http_method: HttpMethod,
) -> Result<(), ()> {
// Building the HttpClient should not fail as it is just setting up the client with the
// proxy and tls settings.
let client =
HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed");
let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval))
.take_until(inputs.shutdown)
.map(move |_| stream::iter(inputs.urls.clone()))
.flatten()
.map(move |url| {
// Building the HttpClient should not fail as it is just setting up the client with the
// proxy and tls settings.
let client = HttpClient::new(inputs.tls.clone(), &inputs.proxy)
.expect("Building HTTP client failed");
let client = client.clone();
let endpoint = url.to_string();

let context_builder = context_builder.clone();
Expand Down Expand Up @@ -157,9 +165,18 @@ pub(crate) async fn call<
}

let start = Instant::now();
client
.send(request)
.map_err(Error::from)
tokio::time::timeout(inputs.target_timeout, client.send(request))
.then(move |result| async move {
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error.into()),
Err(_) => Err(format!(
"Timeout error: request exceeded {}s",
inputs.target_timeout.as_secs_f64()
)
.into()),
}
})
.and_then(|response| async move {
let (header, body) = response.into_parts();
let body = hyper::body::to_bytes(body).await?;
Expand Down Expand Up @@ -224,8 +241,9 @@ pub(crate) async fn call<
})
})
.flatten()
.boxed()
})
.flatten()
.flatten_unordered(None)
.boxed();

match out.send_event_stream(&mut stream).await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,21 @@ base: components: sources: http_client: configuration: {
}
}
scrape_interval_secs: {
description: "The interval between calls."
description: "The interval between scrapes. Requests run concurrently."
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
scrape_timeout_secs: {
description: "The timeout for each scrape request."
required: false
type: float: {
default: 5.0
unit: "seconds"
}
}
tls: {
description: "TLS configuration."
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,21 @@ base: components: sources: prometheus_scrape: configuration: {
}
}
scrape_interval_secs: {
description: "The interval between scrapes, in seconds."
description: "The interval between scrapes. Requests run concurrently."
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
scrape_timeout_secs: {
description: "The timeout for each scrape request."
required: false
type: float: {
default: 5.0
unit: "seconds"
}
}
tls: {
description: "TLS configuration."
required: false
Expand Down