Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/azure_blob_connect_proxy.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `azure_blob` sink now supports routing requests through an HTTP CONNECT proxy, enabling uploads from restricted networks that require an outbound proxy.
Comment thread
joshcoughlan marked this conversation as resolved.
Outdated

authors: joshuacoughlan
3 changes: 2 additions & 1 deletion src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ impl GenerateConfig for AzureBlobSinkConfig {
#[async_trait::async_trait]
#[typetag::serde(name = "azure_blob")]
impl SinkConfig for AzureBlobSinkConfig {
async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
cx.proxy(),
)?;

let healthcheck = azure_common::config::build_healthcheck(
Expand Down
6 changes: 6 additions & 0 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async fn azure_blob_healthcheck_passed() {
let client = azure_common::config::build_client(
config.connection_string.clone().into(),
config.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.expect("Failed to create client");

Expand All @@ -51,6 +52,7 @@ async fn azure_blob_healthcheck_unknown_container() {
let client = azure_common::config::build_client(
config.connection_string.clone().into(),
config.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.expect("Failed to create client");

Expand Down Expand Up @@ -234,6 +236,7 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.expect("Failed to create client");

Expand All @@ -251,6 +254,7 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.unwrap();

Expand All @@ -275,6 +279,7 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.unwrap();

Expand Down Expand Up @@ -335,6 +340,7 @@ impl AzureBlobSinkConfig {
let client = azure_common::config::build_client(
self.connection_string.clone().into(),
self.container_name.clone(),
&crate::config::ProxyConfig::default(),
)
.unwrap();
let result = client.create_container(None).await;
Expand Down
31 changes: 29 additions & 2 deletions src/sinks/azure_common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub fn build_healthcheck(
pub fn build_client(
connection_string: String,
container_name: String,
proxy: &crate::config::ProxyConfig,
) -> crate::Result<Arc<BlobContainerClient>> {
// Parse connection string without legacy SDK
let parsed = ParsedConnectionString::parse(&connection_string)
Expand Down Expand Up @@ -163,9 +164,35 @@ pub fn build_client(
}
}

// Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12.
// Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12
let mut reqwest_builder = reqwest_12::ClientBuilder::new();
let bypass_proxy = {
let host = url.host_str().unwrap_or("");
let port = url.port();
proxy.no_proxy.matches(host)
|| port
.map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
.unwrap_or(false)
};
if bypass_proxy || !proxy.enabled {
// Ensure no proxy (and disable any potential system proxy auto-detection)
reqwest_builder = reqwest_builder.no_proxy();
} else {
if let Some(http) = &proxy.http {
let p = reqwest_12::Proxy::http(http)
.map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
// If credentials are embedded in the proxy URL, reqwest will handle them.
reqwest_builder = reqwest_builder.proxy(p);
}
if let Some(https) = &proxy.https {
let p = reqwest_12::Proxy::https(https)
.map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
// If credentials are embedded in the proxy URL, reqwest will handle them.
reqwest_builder = reqwest_builder.proxy(p);
}
}
options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
reqwest_12::ClientBuilder::new()
reqwest_builder
.build()
.map_err(|e| format!("Failed to build reqwest client: {e}"))?,
)));
Expand Down
8 changes: 0 additions & 8 deletions src/sinks/azure_common/shared_key_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,6 @@ impl Policy for SharedKeyAuthorizationPolicy {
let (ms_date, ms_version) = self.ensure_ms_headers(request)?;
// Build string to sign
let sts = self.build_string_to_sign(request, &ms_date, &ms_version)?;
// // Debug string-to-sign for troubleshooting (safe: does not include key)
// let compact = sts.replace('\n', "\\n");
// tracing::debug!(
// method = %request.method().as_str(),
// url = %request.url(),
// string_to_sign = %compact,
// "Azure shared key string_to_sign."
// );
let signature = self.sign(&sts)?;

// Authorization: SharedKey {account}:{signature}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/azure_blob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ components: sinks: azure_blob: {
enum: ["json", "text"]
}
}
proxy: enabled: true
request: {
enabled: true
rate_limit_num: 250
Expand Down
Loading