diff --git a/changelog.d/azure_blob_connect_proxy.feature.md b/changelog.d/azure_blob_connect_proxy.feature.md new file mode 100644 index 0000000000000..ccc915e798bcc --- /dev/null +++ b/changelog.d/azure_blob_connect_proxy.feature.md @@ -0,0 +1,3 @@ +The `azure_blob` sink now supports routing requests through HTTP/HTTPS proxies, enabling uploads from restricted networks that require an outbound proxy. + +authors: joshuacoughlan diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index bd931d3925bf2..db0521b4fa137 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -165,10 +165,11 @@ impl GenerateConfig for AzureBlobSinkConfig { #[async_trait::async_trait] #[typetag::serde(name = "azure_blob")] impl SinkConfig for AzureBlobSinkConfig { - async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> { + async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + cx.proxy(), )?; let healthcheck = azure_common::config::build_healthcheck( diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 00ef11f17030a..63943312fe66d 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -32,6 +32,7 @@ async fn azure_blob_healthcheck_passed() { let client = azure_common::config::build_client( config.connection_string.clone().into(), config.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .expect("Failed to create client"); @@ -51,6 +52,7 @@ async fn azure_blob_healthcheck_unknown_container() { let client = azure_common::config::build_client( config.connection_string.clone().into(), config.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .expect("Failed to create client"); @@ -234,6 +236,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .expect("Failed to create client"); @@ -251,6 +254,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .unwrap(); @@ -275,6 +279,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .unwrap(); @@ -335,6 +340,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .unwrap(); let result = client.create_container(None).await; diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index fd8dbd0741e13..139608d9847ab 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -129,6 +129,7 @@ pub fn build_healthcheck( pub fn build_client( connection_string: String, container_name: String, + proxy: &crate::config::ProxyConfig, ) -> crate::Result> { // Parse connection string without legacy SDK let parsed = ParsedConnectionString::parse(&connection_string) @@ -163,9 +164,35 @@ pub fn build_client( } } - // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12. + // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12 + let mut reqwest_builder = reqwest_12::ClientBuilder::new(); + let bypass_proxy = { + let host = url.host_str().unwrap_or(""); + let port = url.port(); + proxy.no_proxy.matches(host) + || port + .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p))) + .unwrap_or(false) + }; + if bypass_proxy || !proxy.enabled { + // Ensure no proxy (and disable any potential system proxy auto-detection) + reqwest_builder = reqwest_builder.no_proxy(); + } else { + if let Some(http) = &proxy.http { + let p = reqwest_12::Proxy::http(http) + .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?; + // If credentials are embedded in the proxy URL, reqwest will handle them. + reqwest_builder = reqwest_builder.proxy(p); + } + if let Some(https) = &proxy.https { + let p = reqwest_12::Proxy::https(https) + .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?; + // If credentials are embedded in the proxy URL, reqwest will handle them. + reqwest_builder = reqwest_builder.proxy(p); + } + } options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( - reqwest_12::ClientBuilder::new() + reqwest_builder .build() .map_err(|e| format!("Failed to build reqwest client: {e}"))?, ))); diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs index 5724484d3c1cb..f684f19a00a65 100644 --- a/src/sinks/azure_common/shared_key_policy.rs +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -243,14 +243,6 @@ impl Policy for SharedKeyAuthorizationPolicy { let (ms_date, ms_version) = self.ensure_ms_headers(request)?; // Build string to sign let sts = self.build_string_to_sign(request, &ms_date, &ms_version)?; - // // Debug string-to-sign for troubleshooting (safe: does not include key) - // let compact = sts.replace('\n', "\\n"); - // tracing::debug!( - // method = %request.method().as_str(), - // url = %request.url(), - // string_to_sign = %compact, - // "Azure shared key string_to_sign." - // ); let signature = self.sign(&sts)?; // Authorization: SharedKey {account}:{signature} diff --git a/website/cue/reference/components/sinks/azure_blob.cue b/website/cue/reference/components/sinks/azure_blob.cue index 11e70af8a81d4..7786fc65d8253 100644 --- a/website/cue/reference/components/sinks/azure_blob.cue +++ b/website/cue/reference/components/sinks/azure_blob.cue @@ -37,6 +37,7 @@ components: sinks: azure_blob: { enum: ["json", "text"] } } + proxy: enabled: true request: { enabled: true rate_limit_num: 250