From fe44d357a0ac8f52300208309ee36e72bb33d3d1 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Thu, 13 Nov 2025 14:51:21 -0600 Subject: [PATCH 1/6] Update Azure crates and migrate to the new SDK --- src/sinks/azure_common/connection_string.rs | 54 ++++++++++++++------- src/sinks/azure_common/shared_key_policy.rs | 8 --- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/src/sinks/azure_common/connection_string.rs b/src/sinks/azure_common/connection_string.rs index 391f9d196a9dd..7675db82accf6 100644 --- a/src/sinks/azure_common/connection_string.rs +++ b/src/sinks/azure_common/connection_string.rs @@ -99,6 +99,21 @@ pub struct ParsedConnectionString { pub development_storage_proxy_uri: Option, } +impl Default for ParsedConnectionString { + fn default() -> Self { + Self { + account_name: None, + account_key: None, + shared_access_signature: None, + default_endpoints_protocol: None, + endpoint_suffix: None, + blob_endpoint: None, + use_development_storage: false, + development_storage_proxy_uri: None, + } + } +} + impl ParsedConnectionString { /// Parse a connection string into a `ParsedConnectionString`. /// @@ -120,23 +135,28 @@ impl ParsedConnectionString { } // Build the structure from the parsed map. - let parsed = ParsedConnectionString { - account_name: map.get("accountname").cloned(), - account_key: map.get("accountkey").cloned(), - shared_access_signature: map - .get("sharedaccesssignature") - .map(|s| normalize_sas(s.as_str())), - default_endpoints_protocol: map - .get("defaultendpointsprotocol") - .map(|s| s.to_ascii_lowercase()), - endpoint_suffix: map.get("endpointsuffix").cloned(), - blob_endpoint: map.get("blobendpoint").cloned(), - use_development_storage: map - .get("usedevelopmentstorage") - .map(|v| v.eq_ignore_ascii_case("true")) - .unwrap_or(false), - development_storage_proxy_uri: map.get("developmentstorageproxyuri").cloned(), - }; + let mut parsed = ParsedConnectionString::default(); + + parsed.account_name = map.get("accountname").cloned(); + parsed.account_key = map.get("accountkey").cloned(); + parsed.shared_access_signature = map + .get("sharedaccesssignature") + .map(|s| normalize_sas(s.as_str())); + + parsed.default_endpoints_protocol = map + .get("defaultendpointsprotocol") + .map(|s| s.to_ascii_lowercase()); + + parsed.endpoint_suffix = map.get("endpointsuffix").cloned(); + + parsed.blob_endpoint = map.get("blobendpoint").cloned(); + + parsed.use_development_storage = map + .get("usedevelopmentstorage") + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + parsed.development_storage_proxy_uri = map.get("developmentstorageproxyuri").cloned(); Ok(parsed) } diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs index 5724484d3c1cb..f684f19a00a65 100644 --- a/src/sinks/azure_common/shared_key_policy.rs +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -243,14 +243,6 @@ impl Policy for SharedKeyAuthorizationPolicy { let (ms_date, ms_version) = self.ensure_ms_headers(request)?; // Build string to sign let sts = self.build_string_to_sign(request, &ms_date, &ms_version)?; - // // Debug string-to-sign for troubleshooting (safe: does not include key) - // let compact = sts.replace('\n', "\\n"); - // tracing::debug!( - // method = %request.method().as_str(), - // url = %request.url(), - // string_to_sign = %compact, - // "Azure shared key string_to_sign." - // ); let signature = self.sign(&sts)?; // Authorization: SharedKey {account}:{signature} From 808fb44fa1eca074ef8fdbe160d23e4fae26635c Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Thu, 13 Nov 2025 18:46:28 -0600 Subject: [PATCH 2/6] add proxy support for azure blob --- src/sinks/azure_blob/config.rs | 3 ++- src/sinks/azure_blob/integration_tests.rs | 6 +++++ src/sinks/azure_common/config.rs | 31 +++++++++++++++++++++-- 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index bd931d3925bf2..db0521b4fa137 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -165,10 +165,11 @@ impl GenerateConfig for AzureBlobSinkConfig { #[async_trait::async_trait] #[typetag::serde(name = "azure_blob")] impl SinkConfig for AzureBlobSinkConfig { - async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> { + async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + cx.proxy(), )?; let healthcheck = azure_common::config::build_healthcheck( diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 00ef11f17030a..63943312fe66d 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -32,6 +32,7 @@ async fn azure_blob_healthcheck_passed() { let client = azure_common::config::build_client( config.connection_string.clone().into(), config.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .expect("Failed to create client"); @@ -51,6 +52,7 @@ async fn azure_blob_healthcheck_unknown_container() { let client = azure_common::config::build_client( config.connection_string.clone().into(), config.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .expect("Failed to create client"); @@ -234,6 +236,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .expect("Failed to create client"); @@ -251,6 +254,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .unwrap(); @@ -275,6 +279,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .unwrap(); @@ -335,6 +340,7 @@ impl AzureBlobSinkConfig { let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), + &crate::config::ProxyConfig::default(), ) .unwrap(); let result = client.create_container(None).await; diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index fd8dbd0741e13..139608d9847ab 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -129,6 +129,7 @@ pub fn build_healthcheck( pub fn build_client( connection_string: String, container_name: String, + proxy: &crate::config::ProxyConfig, ) -> crate::Result> { // Parse connection string without legacy SDK let parsed = ParsedConnectionString::parse(&connection_string) @@ -163,9 +164,35 @@ pub fn build_client( } } - // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12. + // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12 + let mut reqwest_builder = reqwest_12::ClientBuilder::new(); + let bypass_proxy = { + let host = url.host_str().unwrap_or(""); + let port = url.port(); + proxy.no_proxy.matches(host) + || port + .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p))) + .unwrap_or(false) + }; + if bypass_proxy || !proxy.enabled { + // Ensure no proxy (and disable any potential system proxy auto-detection) + reqwest_builder = reqwest_builder.no_proxy(); + } else { + if let Some(http) = &proxy.http { + let p = reqwest_12::Proxy::http(http) + .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?; + // If credentials are embedded in the proxy URL, reqwest will handle them. + reqwest_builder = reqwest_builder.proxy(p); + } + if let Some(https) = &proxy.https { + let p = reqwest_12::Proxy::https(https) + .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?; + // If credentials are embedded in the proxy URL, reqwest will handle them. + reqwest_builder = reqwest_builder.proxy(p); + } + } options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( - reqwest_12::ClientBuilder::new() + reqwest_builder .build() .map_err(|e| format!("Failed to build reqwest client: {e}"))?, ))); From 7621b9686990d0dd2f95db3f5ca4b1f6919f8c01 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Mon, 2 Feb 2026 20:51:58 -0600 Subject: [PATCH 3/6] resolve conflict with default impl for ParsedConnectionString --- src/sinks/azure_common/connection_string.rs | 54 +++++++-------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/src/sinks/azure_common/connection_string.rs b/src/sinks/azure_common/connection_string.rs index 7675db82accf6..391f9d196a9dd 100644 --- a/src/sinks/azure_common/connection_string.rs +++ b/src/sinks/azure_common/connection_string.rs @@ -99,21 +99,6 @@ pub struct ParsedConnectionString { pub development_storage_proxy_uri: Option, } -impl Default for ParsedConnectionString { - fn default() -> Self { - Self { - account_name: None, - account_key: None, - shared_access_signature: None, - default_endpoints_protocol: None, - endpoint_suffix: None, - blob_endpoint: None, - use_development_storage: false, - development_storage_proxy_uri: None, - } - } -} - impl ParsedConnectionString { /// Parse a connection string into a `ParsedConnectionString`. /// @@ -135,28 +120,23 @@ impl ParsedConnectionString { } // Build the structure from the parsed map. - let mut parsed = ParsedConnectionString::default(); - - parsed.account_name = map.get("accountname").cloned(); - parsed.account_key = map.get("accountkey").cloned(); - parsed.shared_access_signature = map - .get("sharedaccesssignature") - .map(|s| normalize_sas(s.as_str())); - - parsed.default_endpoints_protocol = map - .get("defaultendpointsprotocol") - .map(|s| s.to_ascii_lowercase()); - - parsed.endpoint_suffix = map.get("endpointsuffix").cloned(); - - parsed.blob_endpoint = map.get("blobendpoint").cloned(); - - parsed.use_development_storage = map - .get("usedevelopmentstorage") - .map(|v| v.eq_ignore_ascii_case("true")) - .unwrap_or(false); - - parsed.development_storage_proxy_uri = map.get("developmentstorageproxyuri").cloned(); + let parsed = ParsedConnectionString { + account_name: map.get("accountname").cloned(), + account_key: map.get("accountkey").cloned(), + shared_access_signature: map + .get("sharedaccesssignature") + .map(|s| normalize_sas(s.as_str())), + default_endpoints_protocol: map + .get("defaultendpointsprotocol") + .map(|s| s.to_ascii_lowercase()), + endpoint_suffix: map.get("endpointsuffix").cloned(), + blob_endpoint: map.get("blobendpoint").cloned(), + use_development_storage: map + .get("usedevelopmentstorage") + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false), + development_storage_proxy_uri: map.get("developmentstorageproxyuri").cloned(), + }; Ok(parsed) } From 64ff81de2d797d586040cc72eb7e4383966ff497 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Tue, 3 Feb 2026 13:24:13 -0600 Subject: [PATCH 4/6] Add changelog and doc update --- changelog.d/azure_blob_connect_proxy.feature.md | 3 +++ website/cue/reference/components/sinks/azure_blob.cue | 1 + 2 files changed, 4 insertions(+) create mode 100644 changelog.d/azure_blob_connect_proxy.feature.md diff --git a/changelog.d/azure_blob_connect_proxy.feature.md b/changelog.d/azure_blob_connect_proxy.feature.md new file mode 100644 index 0000000000000..d2a7c92500820 --- /dev/null +++ b/changelog.d/azure_blob_connect_proxy.feature.md @@ -0,0 +1,3 @@ +The `azure_blob` sink now supports routing requests through an HTTP CONNECT proxy, enabling uploads from restricted networks that require an outbound proxy. + +authors: joshuacoughlan \ No newline at end of file diff --git a/website/cue/reference/components/sinks/azure_blob.cue b/website/cue/reference/components/sinks/azure_blob.cue index 11e70af8a81d4..7786fc65d8253 100644 --- a/website/cue/reference/components/sinks/azure_blob.cue +++ b/website/cue/reference/components/sinks/azure_blob.cue @@ -37,6 +37,7 @@ components: sinks: azure_blob: { enum: ["json", "text"] } } + proxy: enabled: true request: { enabled: true rate_limit_num: 250 From 38ae1e35dcdafe0e2548f71ecc82acbf2e38b9fc Mon Sep 17 00:00:00 2001 From: Josh Coughlan Date: Fri, 13 Feb 2026 01:38:18 -0600 Subject: [PATCH 5/6] Update changelog.d/azure_blob_connect_proxy.feature.md Co-authored-by: Thomas --- changelog.d/azure_blob_connect_proxy.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/azure_blob_connect_proxy.feature.md b/changelog.d/azure_blob_connect_proxy.feature.md index d2a7c92500820..b79d942e36a01 100644 --- a/changelog.d/azure_blob_connect_proxy.feature.md +++ b/changelog.d/azure_blob_connect_proxy.feature.md @@ -1,3 +1,3 @@ -The `azure_blob` sink now supports routing requests through an HTTP CONNECT proxy, enabling uploads from restricted networks that require an outbound proxy. +The `azure_blob` sink now supports routing requests through HTTP/HTTPS proxies, enabling uploads from restricted networks that require an outbound proxy. authors: joshuacoughlan \ No newline at end of file From 22a80c54b7baa4b1b804da3e464f50f49ec499c3 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Fri, 13 Feb 2026 16:20:23 -0600 Subject: [PATCH 6/6] add a newline to changelog --- changelog.d/azure_blob_connect_proxy.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/azure_blob_connect_proxy.feature.md b/changelog.d/azure_blob_connect_proxy.feature.md index b79d942e36a01..ccc915e798bcc 100644 --- a/changelog.d/azure_blob_connect_proxy.feature.md +++ b/changelog.d/azure_blob_connect_proxy.feature.md @@ -1,3 +1,3 @@ The `azure_blob` sink now supports routing requests through HTTP/HTTPS proxies, enabling uploads from restricted networks that require an outbound proxy. -authors: joshuacoughlan \ No newline at end of file +authors: joshuacoughlan