diff --git a/src/http.rs b/src/http.rs index bcf6a41c13f38..8219f1c07977c 100644 --- a/src/http.rs +++ b/src/http.rs @@ -8,6 +8,7 @@ use headers::{Authorization, HeaderMapExt}; use http::{header::HeaderValue, request::Builder, uri::InvalidUri, HeaderMap, Request}; use hyper::{ body::{Body, HttpBody}, + client, client::{Client, HttpConnector}, }; use hyper_openssl::HttpsConnector; @@ -49,6 +50,14 @@ where pub fn new( tls_settings: impl Into, proxy_config: &ProxyConfig, + ) -> Result, HttpError> { + HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut Client::builder()) + } + + pub fn new_with_custom_client( + tls_settings: impl Into, + proxy_config: &ProxyConfig, + client_builder: &mut client::Builder, ) -> Result, HttpError> { let mut http = HttpConnector::new(); http.enforce_http(false); @@ -70,7 +79,7 @@ where proxy_config .configure(&mut proxy) .context(MakeProxyConnector)?; - let client = Client::builder().build(proxy); + let client = client_builder.build(proxy); let version = crate::get_version(); let user_agent = HeaderValue::from_str(&format!("Vector/{}", version)) diff --git a/src/rusoto/mod.rs b/src/rusoto/mod.rs index 9a3f1f94f3c85..4584e3e4e1387 100644 --- a/src/rusoto/mod.rs +++ b/src/rusoto/mod.rs @@ -34,6 +34,7 @@ use tower::{Service, ServiceExt}; pub mod auth; pub mod region; pub use auth::AwsAuthentication; +use hyper::client; pub use region::{region_from_endpoint, RegionOrEndpoint}; use rusoto_core::credential::ProfileProvider; @@ -45,6 +46,15 @@ pub fn client(proxy: &ProxyConfig) -> crate::Result { Ok(HttpClient { client }) } +pub fn custom_client( + proxy: &ProxyConfig, + client_builder: &mut client::Builder, +) -> crate::Result { + let settings = MaybeTlsSettings::enable_client()?; + let client = super::http::HttpClient::new_with_custom_client(settings, proxy, client_builder)?; + Ok(HttpClient { client }) +} + #[derive(Debug, Snafu)] enum AwsRusotoError { #[snafu(display("Failed to create request dispatcher"))] diff --git a/src/sinks/s3_common/config.rs b/src/sinks/s3_common/config.rs index fa2402a2249ac..2e0d4f7bedc71 100644 --- a/src/sinks/s3_common/config.rs +++ b/src/sinks/s3_common/config.rs @@ -5,11 +5,12 @@ use crate::sinks::util::retries::RetryLogic; use crate::sinks::Healthcheck; use futures::FutureExt; use http::StatusCode; +use hyper::client; use rusoto_core::RusotoError; use rusoto_s3::{HeadBucketRequest, PutObjectError, S3Client, S3}; use serde::{Deserialize, Serialize}; use snafu::Snafu; -use std::{collections::BTreeMap, convert::TryInto}; +use std::{collections::BTreeMap, convert::TryInto, time::Duration}; use super::service::S3Response; @@ -119,7 +120,12 @@ pub fn create_client( proxy: &ProxyConfig, ) -> crate::Result { let region = region.try_into()?; - let client = rusoto::client(proxy)?; + let client = rusoto::custom_client( + proxy, + // S3 closes idle connections after 20 seconds, + // so we can close idle connections ahead of time to prevent re-using them + client::Client::builder().pool_idle_timeout(Duration::from_secs(15)), + )?; let creds = auth.build(®ion, assume_role)?;