From a86bbbbe2a53d0d66171af63adab679eacf72b71 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Thu, 13 Nov 2025 14:51:21 -0600 Subject: [PATCH 01/13] Update Azure crates and migrate to the new SDK --- Cargo.lock | 396 +++++++----------- Cargo.toml | 19 +- src/sinks/azure_blob/config.rs | 4 +- src/sinks/azure_blob/integration_tests.rs | 102 +++-- src/sinks/azure_common/config.rs | 115 +++--- src/sinks/azure_common/connection_string.rs | 423 ++++++++++++++++++++ src/sinks/azure_common/mod.rs | 2 + src/sinks/azure_common/service.rs | 35 +- src/sinks/azure_common/shared_key_policy.rs | 217 ++++++++++ 9 files changed, 936 insertions(+), 377 deletions(-) create mode 100644 src/sinks/azure_common/connection_string.rs create mode 100644 src/sinks/azure_common/shared_key_policy.rs diff --git a/Cargo.lock b/Cargo.lock index e2f86f33a558e..6820ed5864af3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,12 +12,6 @@ dependencies = [ "regex", ] -[[package]] -name = "RustyXML" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" - [[package]] name = "addr2line" version = "0.24.2" @@ -1702,127 +1696,69 @@ dependencies = [ [[package]] name = "azure_core" -version = "0.21.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b552ad43a45a746461ec3d3a51dfb6466b4759209414b439c165eb6a6b7729e" +checksum = "f35444aeeb91e29ca82d04dbb8ad904570f837c82093454dc1989c64a33554a7" dependencies = [ + "async-lock 3.4.0", "async-trait", - "base64 0.22.1", + "azure_core_macros", "bytes 1.10.1", - "dyn-clone", "futures 0.3.31", - "getrandom 0.2.15", - "http-types", - "once_cell", "openssl", - "paste", "pin-project", - "quick-xml 0.31.0", - "rand 0.8.5", - "reqwest 0.12.9", "rustc_version", "serde", "serde_json", - "time", "tracing 0.1.41", - "url", - "uuid", + "typespec", + "typespec_client_core", ] [[package]] -name = "azure_core" -version = "0.25.0" +name = "azure_core_macros" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82c33c072c9d87777262f35abfe2a64b609437076551d4dac8373e60f0e3fde9" +checksum = "94f7cc1bbae04cfe11de9e39e2c6dc755947901e0f4e76180ab542b6deb5e15e" dependencies = [ - "async-lock 3.4.0", - "async-trait", - "bytes 1.10.1", - "futures 0.3.31", - "openssl", - "pin-project", - "rustc_version", - "serde", - "serde_json", + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", "tracing 0.1.41", - "typespec", - "typespec_client_core", ] [[package]] name = "azure_identity" -version = "0.25.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb64e97087965481c94f1703c57e678df09df73e2cdaee8952558f9c6c7d100" +checksum = "0f07bb0ee212021e75c3645e82d078e436b4b4184bde1295e9e81fcbcef923af" dependencies = [ "async-lock 3.4.0", "async-trait", - "azure_core 0.25.0", + "azure_core", "futures 0.3.31", "pin-project", "serde", "time", "tracing 0.1.41", - "typespec_client_core", "url", ] [[package]] -name = "azure_storage" -version = "0.21.0" +name = "azure_storage_blob" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59f838159f4d29cb400a14d9d757578ba495ae64feb07a7516bf9e4415127126" +checksum = "c38e589153b04af727a736f435d07b48b1a2b17df9353e5d439698fe8b718412" dependencies = [ - "RustyXML", - "async-lock 3.4.0", "async-trait", - "azure_core 0.21.0", - "bytes 1.10.1", - "serde", - "serde_derive", - "time", - "tracing 0.1.41", - "url", - "uuid", -] - -[[package]] -name = "azure_storage_blobs" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97e83c3636ae86d9a6a7962b2112e3b19eb3903915c50ce06ff54ff0a2e6a7e4" -dependencies = [ - "RustyXML", - "azure_core 0.21.0", - "azure_storage", - "azure_svc_blobstorage", - "bytes 1.10.1", - "futures 0.3.31", + "azure_core", "serde", - "serde_derive", "serde_json", - "time", - "tracing 0.1.41", + "typespec_client_core", "url", "uuid", ] -[[package]] -name = "azure_svc_blobstorage" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e6c6f20c5611b885ba94c7bae5e02849a267381aecb8aee577e8c35ff4064c6" -dependencies = [ - "azure_core 0.21.0", - "bytes 1.10.1", - "futures 0.3.31", - "log", - "once_cell", - "serde", - "serde_json", - "time", -] - [[package]] name = "backoff" version = "0.4.0" @@ -2662,7 +2598,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -3345,7 +3281,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.4", "percent-encoding", - "reqwest 0.12.9", + "reqwest 0.12.24", "semver", "serde", "serde_json", @@ -3399,12 +3335,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.9" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" dependencies = [ "powerfmt", - "serde", + "serde_core", ] [[package]] @@ -4482,17 +4418,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "getrandom" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" -dependencies = [ - "cfg-if", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - [[package]] name = "getrandom" version = "0.2.15" @@ -4502,7 +4427,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] @@ -4581,7 +4506,7 @@ dependencies = [ "arc-swap", "futures 0.3.31", "log", - "reqwest 0.12.9", + "reqwest 0.12.24", "serde", "serde_derive", "serde_json", @@ -5224,26 +5149,6 @@ dependencies = [ "serde", ] -[[package]] -name = "http-types" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" -dependencies = [ - "anyhow", - "async-channel 1.9.0", - "base64 0.13.1", - "futures-lite 1.13.0", - "infer", - "pin-project-lite", - "rand 0.7.3", - "serde", - "serde_json", - "serde_qs", - "serde_urlencoded", - "url", -] - [[package]] name = "httparse" version = "1.10.1" @@ -5469,21 +5374,28 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" dependencies = [ + "base64 0.22.1", "bytes 1.10.1", "futures-channel", + "futures-core", "futures-util", "http 1.3.1", "http-body 1.0.0", "hyper 1.7.0", + "ipnet", + "libc", + "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.0", + "system-configuration 0.6.1", "tokio", "tower-service", "tracing 0.1.41", + "windows-registry", ] [[package]] @@ -5712,12 +5624,6 @@ version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd" -[[package]] -name = "infer" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" - [[package]] name = "influxdb-line-protocol" version = "2.0.0" @@ -5859,6 +5765,16 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf370abdafd54d13e54a620e8c3e1145f28e46cc9d704bc6d94414559df41763" +[[package]] +name = "iri-string" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is-terminal" version = "0.4.9" @@ -6865,7 +6781,7 @@ dependencies = [ "hermit-abi 0.3.9", "libc", "log", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "windows-sys 0.52.0", ] @@ -7672,7 +7588,7 @@ dependencies = [ "md-5", "percent-encoding", "quick-xml 0.37.4", - "reqwest 0.12.9", + "reqwest 0.12.24", "serde", "serde_json", "tokio", @@ -8777,7 +8693,7 @@ dependencies = [ "libc", "once_cell", "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", + "wasi", "web-sys", "winapi", ] @@ -8805,9 +8721,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.31.0" +version = "0.37.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +checksum = "a4ce8c88de324ff838700f36fb6ab86c96df0e3c4ab6ef3a9b2044465cce1369" dependencies = [ "memchr", "serde", @@ -8815,9 +8731,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.37.4" +version = "0.38.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4ce8c88de324ff838700f36fb6ab86c96df0e3c4ab6ef3a9b2044465cce1369" +checksum = "b66c2058c55a409d601666cffe35f04333cf1013010882cec174a7467cd4e21c" dependencies = [ "memchr", "serde", @@ -8943,19 +8859,6 @@ dependencies = [ "nibble_vec", ] -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom 0.1.16", - "libc", - "rand_chacha 0.2.2", - "rand_core 0.5.1", - "rand_hc", -] - [[package]] name = "rand" version = "0.8.5" @@ -8977,16 +8880,6 @@ dependencies = [ "rand_core 0.9.0", ] -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core 0.5.1", -] - [[package]] name = "rand_chacha" version = "0.3.1" @@ -9007,15 +8900,6 @@ dependencies = [ "rand_core 0.9.0", ] -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom 0.1.16", -] - [[package]] name = "rand_core" version = "0.6.4" @@ -9045,15 +8929,6 @@ dependencies = [ "rand 0.9.2", ] -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core 0.5.1", -] - [[package]] name = "rand_xorshift" version = "0.4.0" @@ -9383,7 +9258,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", - "system-configuration", + "system-configuration 0.5.1", "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", @@ -9398,15 +9273,15 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.9" +version = "0.12.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" dependencies = [ - "async-compression", "base64 0.22.1", "bytes 1.10.1", "cookie", "cookie_store", + "encoding_rs", "futures-channel", "futures-core", "futures-util", @@ -9418,19 +9293,16 @@ dependencies = [ "hyper-rustls 0.27.5", "hyper-tls 0.6.0", "hyper-util", - "ipnet", "js-sys", "log", "mime", "mime_guess", "native-tls", - "once_cell", "percent-encoding", "pin-project-lite", "quinn", "rustls 0.23.23", "rustls-native-certs 0.8.1", - "rustls-pemfile 2.1.0", "rustls-pki-types", "serde", "serde_json", @@ -9440,14 +9312,15 @@ dependencies = [ "tokio-native-tls", "tokio-rustls 0.26.2", "tokio-util", + "tower 0.5.2", + "tower-http 0.6.6", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 0.26.1", - "windows-registry", + "webpki-roots 1.0.4", ] [[package]] @@ -9459,7 +9332,7 @@ dependencies = [ "anyhow", "async-trait", "http 1.3.1", - "reqwest 0.12.9", + "reqwest 0.12.24", "serde", "thiserror 1.0.68", "tower-service", @@ -9478,7 +9351,7 @@ dependencies = [ "http 1.3.1", "hyper 1.7.0", "parking_lot 0.11.2", - "reqwest 0.12.9", + "reqwest 0.12.24", "reqwest-middleware", "retry-policies", "thiserror 1.0.68", @@ -10234,17 +10107,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_qs" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" -dependencies = [ - "percent-encoding", - "serde", - "thiserror 1.0.68", -] - [[package]] name = "serde_repr" version = "0.1.17" @@ -11150,7 +11012,18 @@ checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ "bitflags 1.3.2", "core-foundation 0.9.3", - "system-configuration-sys", + "system-configuration-sys 0.5.0", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags 2.9.0", + "core-foundation 0.9.3", + "system-configuration-sys 0.6.0", ] [[package]] @@ -11163,6 +11036,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -11340,9 +11223,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.36" +version = "0.3.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", "itoa", @@ -11358,15 +11241,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" [[package]] name = "time-macros" -version = "0.2.18" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" dependencies = [ "num-conv", "time-core", @@ -11903,6 +11786,24 @@ dependencies = [ "tracing 0.1.41", ] +[[package]] +name = "tower-http" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +dependencies = [ + "bitflags 2.9.0", + "bytes 1.10.1", + "futures-util", + "http 1.3.1", + "http-body 1.0.0", + "iri-string", + "pin-project-lite", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" @@ -12213,11 +12114,14 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "typespec" -version = "0.5.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04c7a952f1f34257f945fc727b20defe7a3c01c05ddd42925977626cfa6e62ab" +checksum = "44f91ea93fdd5fd4985fcc0a197ed8e8da18705912bef63c9b9b3148d6f35510" dependencies = [ "base64 0.22.1", + "bytes 1.10.1", + "futures 0.3.31", + "quick-xml 0.38.4", "serde", "serde_json", "url", @@ -12225,19 +12129,18 @@ dependencies = [ [[package]] name = "typespec_client_core" -version = "0.4.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5879ce67ba9e525fe088c882ede1337c32c3f80e83e72d9fd3cc6c8e05bcb3d7" +checksum = "1a0f6f7345c3389663551a64fc4dca78fa9689ece758c5ca76e82d6da69349dc" dependencies = [ "async-trait", "base64 0.22.1", - "bytes 1.10.1", "dyn-clone", "futures 0.3.31", - "getrandom 0.2.15", + "getrandom 0.3.4", "pin-project", - "rand 0.8.5", - "reqwest 0.12.9", + "rand 0.9.2", + "reqwest 0.12.24", "serde", "serde_json", "time", @@ -12251,9 +12154,9 @@ dependencies = [ [[package]] name = "typespec_macros" -version = "0.4.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6cbccdbe531c8d553812a609bdb70c0d1002ad91333498e18df42c98744b15cc" +checksum = "6ecee5b05c459ea4cd97df7685db58699c32e070465f88dc806d7c98a5088edc" dependencies = [ "proc-macro2 1.0.101", "quote 1.0.40", @@ -12587,11 +12490,9 @@ dependencies = [ "aws-smithy-types", "aws-types", "axum 0.6.20", - "azure_core 0.21.0", - "azure_core 0.25.0", + "azure_core", "azure_identity", - "azure_storage", - "azure_storage_blobs", + "azure_storage_blob", "base64 0.22.1", "bloomy", "bollard", @@ -12693,6 +12594,7 @@ dependencies = [ "redis", "regex", "reqwest 0.11.26", + "reqwest 0.12.24", "rmp-serde", "rmpv", "roaring", @@ -13241,7 +13143,7 @@ dependencies = [ "quoted_printable", "rand 0.8.5", "regex", - "reqwest 0.12.9", + "reqwest 0.12.24", "reqwest-middleware", "reqwest-retry", "roxmltree", @@ -13351,12 +13253,6 @@ dependencies = [ "tracing 0.1.41", ] -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -13528,6 +13424,15 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -13657,8 +13562,8 @@ dependencies = [ "windows-implement", "windows-interface", "windows-link 0.1.0", - "windows-result 0.3.1", - "windows-strings 0.3.1", + "windows-result", + "windows-strings", ] [[package]] @@ -13717,22 +13622,13 @@ dependencies = [ [[package]] name = "windows-registry" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" -dependencies = [ - "windows-result 0.2.0", - "windows-strings 0.1.0", - "windows-targets 0.52.6", -] - -[[package]] -name = "windows-result" -version = "0.2.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +checksum = "6c44a98275e31bfd112bb06ba96c8ab13c03383a3753fdddd715406a1824c7e0" dependencies = [ - "windows-targets 0.52.6", + "windows-link 0.1.0", + "windows-result", + "windows-strings", ] [[package]] @@ -13755,16 +13651,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "windows-strings" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" -dependencies = [ - "windows-result 0.2.0", - "windows-targets 0.52.6", -] - [[package]] name = "windows-strings" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 102ae70602f00..9e6c8fe426d71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -209,6 +209,7 @@ serial_test = { version = "3.2" } [dependencies] cfg-if.workspace = true +reqwest_0_12_24 = { package = "reqwest", version = "0.12.24", features = ["json"] } clap.workspace = true indoc.workspace = true paste.workspace = true @@ -287,15 +288,11 @@ aws-smithy-runtime-api = { version = "1.7.3", default-features = false, optional aws-smithy-types = { version = "1.2.11", default-features = false, features = ["rt-tokio"], optional = true } # Azure -azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl"], optional = true } -azure_identity = { version = "0.25", default-features = false, features = ["reqwest"], optional = true } +azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] } +azure_identity = { version = "0.30", default-features = false, optional = true } # Azure Storage -azure_storage = { version = "0.21", default-features = false, optional = true } -azure_storage_blobs = { version = "0.21", default-features = false, optional = true } - -# Needed to bridge with outdated version of azure_core used in azure_storage* -azure_core_for_storage = { package = "azure_core", version = "0.21.0", default-features = false, features = ["enable_reqwest", "hmac_openssl"] } +azure_storage_blob = { version = "0.7", default-features = false, optional = true } # OpenDAL @@ -464,10 +461,8 @@ openssl-src = { version = "300", default-features = false, features = ["force-en approx = "0.5.1" assert_cmd = { version = "2.0.17", default-features = false } aws-smithy-runtime = { version = "1.8.3", default-features = false, features = ["tls-rustls"] } -azure_core = { version = "0.25", default-features = false, features = ["reqwest", "hmac_openssl", "azurite_workaround"] } -azure_identity = { version = "0.25", default-features = false, features = ["reqwest"] } -azure_storage = { version = "0.21", default-features = false, features = ["enable_reqwest", "hmac_openssl"] } -azure_storage_blobs = { version = "0.21", default-features = false, features = ["enable_reqwest", "hmac_openssl", "azurite_workaround"] } +azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] } +azure_identity = { version = "0.30", default-features = false } base64 = "0.22.1" criterion = { version = "0.7.0", features = ["html_reports", "async_tokio"] } itertools.workspace = true @@ -853,7 +848,7 @@ sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"] sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-http"] -sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"] +sinks-azure_blob = ["dep:azure_storage_blob"] sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 3c2437c554106..bd931d3925bf2 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use azure_storage_blobs::prelude::*; +use azure_storage_blob::BlobContainerClient; use tower::ServiceBuilder; use vector_lib::{ codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer}, @@ -193,7 +193,7 @@ const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s"; const DEFAULT_FILENAME_APPEND_UUID: bool = true; impl AzureBlobSinkConfig { - pub fn build_processor(&self, client: Arc) -> crate::Result { + pub fn build_processor(&self, client: Arc) -> crate::Result { let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, AzureBlobRetryLogic) diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index a48b330982eb5..1ffa70976a697 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -4,8 +4,8 @@ use std::{ }; use azure_core::http::StatusCode; -use azure_core_for_storage::prelude::Range; -use azure_storage_blobs::prelude::*; + +use azure_storage_blob::BlobContainerClient; use bytes::{Buf, BytesMut}; use flate2::read::GzDecoder; use futures::{Stream, StreamExt, stream}; @@ -83,8 +83,8 @@ async fn azure_blob_insert_lines_into_blob() { let blobs = config.list_blobs(blob_prefix).await; assert_eq!(blobs.len(), 1); assert!(blobs[0].clone().ends_with(".log")); - let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await; - assert_eq!(blob.properties.content_type, String::from("text/plain")); + let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!(content_type, Some(String::from("text/plain"))); assert_eq!(lines, blob_lines); } @@ -108,12 +108,9 @@ async fn azure_blob_insert_json_into_blob() { let blobs = config.list_blobs(blob_prefix).await; assert_eq!(blobs.len(), 1); assert!(blobs[0].clone().ends_with(".log")); - let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await; - assert_eq!(blob.properties.content_encoding, None); - assert_eq!( - blob.properties.content_type, - String::from("application/x-ndjson") - ); + let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!(content_encoding, None); + assert_eq!(content_type, Some(String::from("application/x-ndjson"))); let expected = events .iter() .map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap()) @@ -262,45 +259,65 @@ impl AzureBlobSinkConfig { self.container_name.clone(), ) .unwrap(); - let response = client - .list_blobs() - .prefix(prefix) - .max_results(NonZeroU32::new(1000).unwrap()) - .delimiter("/") - .include_metadata(true) - .into_stream() + + // Use new SDK pager to fetch first page and collect blob names. + let mut pager = client + .list_blobs(None) + .expect("Failed to start list blobs pager"); + let page = pager .next() .await .expect("Failed to fetch blobs") - .unwrap(); - - response - .blobs - .blobs() - .map(|blob| blob.name.clone()) - .collect::>() + .into_body(); + + // Best-effort extraction of names from the page body. + // Depending on SDK struct names, this may need tweaking: + // ListBlobsFlatSegmentResponse { segment: { blob_items: [{ name, .. }, ..] }, .. } + let names = page + .segment + .blob_items + .into_iter() + .map(|b| b.name) + .filter(|name| name.starts_with(&prefix)) + .collect::>(); + + names } - pub async fn get_blob(&self, blob: String) -> (Blob, Vec) { + pub async fn get_blob(&self, blob: String) -> (Option, Option, Vec) { + use azure_storage_blob::clients::BlobClient as _; let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), ) .unwrap(); - let response = client - .blob_client(blob) - .get() - .range(Range::new(0, 1024 * 1024)) - .into_stream() - .next() + + let blob_client = client.blob_client(&blob); + + // Fetch properties to obtain content-type and content-encoding + let props = blob_client + .get_properties(None) .await - .expect("Failed to get blob") - .unwrap(); + .expect("Failed to get blob properties") + .into_body(); - ( - response.blob, - self.get_blob_content(response.data.collect().await.unwrap().to_vec()), - ) + let content_type = props.content_type.clone(); + let content_encoding = props.content_encoding.clone(); + + // Download blob content (full or first MB as needed) + let downloaded = blob_client + .download(None) + .await + .expect("Failed to download blob"); + let data = downloaded + .into_body() + .data + .collect() + .await + .expect("Failed to read blob body") + .to_vec(); + + (content_type, content_encoding, self.get_blob_content(data)) } fn get_blob_content(&self, data: Vec) -> Vec { @@ -322,15 +339,12 @@ impl AzureBlobSinkConfig { self.container_name.clone(), ) .unwrap(); - let request = client - .create() - .public_access(PublicAccess::None) - .into_future(); + let result = client.create_container(None).await; - let response = match request.await { + let response = match result { Ok(_) => Ok(()), - Err(error) => match error.as_http_error() { - Some(http_error) if http_error.status() as u16 == StatusCode::Conflict => Ok(()), + Err(error) => match error.http_status() { + Some(status) if status.as_u16() == StatusCode::Conflict => Ok(()), _ => Err(error), }, }; diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index fadd0e2c4707c..5fe7c7ede2596 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -1,12 +1,15 @@ use std::sync::Arc; -use azure_core::error::HttpError; -use azure_core_for_storage::RetryOptions; -use azure_storage::{CloudLocation, ConnectionString}; -use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*}; +use azure_core::error::Error as AzureCoreError; + +use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString}; +use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy; +use azure_core::http::Url; +use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions}; + +use azure_core::http::StatusCode; use bytes::Bytes; use futures::FutureExt; -use http::StatusCode; use snafu::Snafu; use vector_lib::{ json_size::JsonSize, @@ -56,19 +59,21 @@ pub struct AzureBlobMetadata { pub struct AzureBlobRetryLogic; impl RetryLogic for AzureBlobRetryLogic { - type Error = HttpError; + type Error = AzureCoreError; type Request = AzureBlobRequest; type Response = AzureBlobResponse; fn is_retriable_error(&self, error: &Self::Error) -> bool { - error.status().is_server_error() - || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::::into(error.status()) + match error.http_status() { + Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests, + None => false, + } } } #[derive(Debug)] pub struct AzureBlobResponse { - pub inner: PutBlockBlobResponse, + pub inner: (), pub events_byte_size: GroupedCountByteSize, pub byte_size: usize, } @@ -99,24 +104,22 @@ pub enum HealthcheckError { pub fn build_healthcheck( container_name: String, - client: Arc, + client: Arc, ) -> crate::Result { let healthcheck = async move { - let response = client.get_properties().into_future().await; - - let resp: crate::Result<()> = match response { + let resp: crate::Result<()> = match client.get_properties(None).await { Ok(_) => Ok(()), - Err(error) => Err(match error.as_http_error() { - Some(err) => match StatusCode::from_u16(err.status().into()) { - Ok(StatusCode::FORBIDDEN) => Box::new(HealthcheckError::InvalidCredentials), - Ok(StatusCode::NOT_FOUND) => Box::new(HealthcheckError::UnknownContainer { + Err(error) => { + let code = error.http_status(); + Err(match code { + Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials), + Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer { container: container_name, }), - Ok(status) => Box::new(HealthcheckError::Unknown { status }), - Err(_) => "unknown status code".into(), - }, - _ => error.into(), - }), + Some(status) => Box::new(HealthcheckError::Unknown { status }), + None => "unknown status code".into(), + }) + } }; resp }; @@ -127,33 +130,47 @@ pub fn build_healthcheck( pub fn build_client( connection_string: String, container_name: String, -) -> crate::Result> { - let client = { - let connection_string = ConnectionString::new(&connection_string)?; - let account_name = connection_string - .account_name - .ok_or("Account name missing in connection string")?; - - match connection_string.blob_endpoint { - // When the blob_endpoint is provided, we use the Custom CloudLocation since it is - // required to contain the full URI to the blob storage API endpoint, this means - // that account_name is not required to exist in the connection_string since - // account_name is only used with the default CloudLocation in the Azure SDK to - // generate the storage API endpoint - Some(uri) => ClientBuilder::with_location( - CloudLocation::Custom { - uri: uri.to_string(), - account: account_name.to_string(), - }, - connection_string.storage_credentials()?, - ), - // Without a valid blob_endpoint in the connection_string, assume we are in Azure - // Commercial (AzureCloud location) and create a default Blob Storage Client that - // builds the API endpoint location using the account_name as input - None => ClientBuilder::new(account_name, connection_string.storage_credentials()?), +) -> crate::Result> { + // Parse connection string without legacy SDK + let parsed = ParsedConnectionString::parse(&connection_string) + .map_err(|e| format!("Invalid connection string: {e}"))?; + // Compose container URL (SAS appended if present) + let container_url = parsed + .container_url(&container_name) + .map_err(|e| format!("Failed to build container URL: {e}"))?; + let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?; + + // Prepare options; attach Shared Key policy if needed + let mut options = BlobContainerClientOptions::default(); + match parsed.auth() { + Auth::Sas { .. } | Auth::None => { + // No extra policy; SAS is in the URL already (or anonymous) } - .retry(RetryOptions::none()) - .container_client(container_name) - }; + Auth::SharedKey { + account_name, + account_key, + } => { + let policy = SharedKeyAuthorizationPolicy::new( + account_name, + account_key, + // Use the same version as the generated client defaults + String::from("2025-11-05"), + ) + .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?; + options + .client_options + .per_call_policies + .push(Arc::new(policy)); + } + } + + // Force Azure SDK to use reqwest_0_12_24 transport to avoid affecting global reqwest + options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( + reqwest_0_12_24::ClientBuilder::new() + .build() + .map_err(|e| format!("Failed to build reqwest_0_12_24 client: {e}"))?, + ))); + let client = + BlobContainerClient::from_url(url, None, Some(options)).map_err(|e| format!("{e}"))?; Ok(Arc::new(client)) } diff --git a/src/sinks/azure_common/connection_string.rs b/src/sinks/azure_common/connection_string.rs new file mode 100644 index 0000000000000..990dcd99d6015 --- /dev/null +++ b/src/sinks/azure_common/connection_string.rs @@ -0,0 +1,423 @@ +/*! +Minimal Azure Storage connection string parser and URL builder for Blob Storage. + +This module intentionally avoids relying on the legacy Azure Storage SDK crates. +It extracts only the fields we need and composes container/blob URLs suitable +for the newer `azure_storage_blob` crate (>= 0.7). + +Supported keys (case-insensitive): +- AccountName +- AccountKey +- SharedAccessSignature +- DefaultEndpointsProtocol +- EndpointSuffix +- BlobEndpoint +- UseDevelopmentStorage +- DevelopmentStorageProxyUri + +Behavior +- If `BlobEndpoint` is present, it is used as the base for container/blob URLs. + It may already include the account segment (e.g., Azurite: http://127.0.0.1:10000/devstoreaccount1). +- Otherwise, if `UseDevelopmentStorage=true`, we synthesize a dev endpoint: + `{protocol}://127.0.0.1:10000/{account_name}`, with `protocol` default `http` if unspecified. + If `DevelopmentStorageProxyUri` is present, it replaces the host/port while still appending + the account name path segment. +- Otherwise, we synthesize the public cloud endpoint: + `{protocol}://{account_name}.blob.{endpoint_suffix}` where `endpoint_suffix` defaults to `core.windows.net` + and `protocol` defaults to `https`. + +SAS handling +- If `SharedAccessSignature` exists, it will be appended to the generated URLs as a query string. + Both `sv=...` and `?sv=...` forms are accepted; the leading '?' is normalized. + +Examples: +- Access key connection string: + "DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64key==;EndpointSuffix=core.windows.net" + Container URL: https://myacct.blob.core.windows.net/logs + Blob URL: https://myacct.blob.core.windows.net/logs/file.txt + +- SAS connection string: + "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=b&..." + Container URL (with SAS): https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&... + Blob URL (with SAS): https://myacct.blob.core.windows.net/logs/file.txt?sv=2022-11-02&ss=b&... + +- Azurite/dev storage: + "UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1" + Container URL: http://127.0.0.1:10000/devstoreaccount1/logs +*/ + +use std::collections::HashMap; + +/// Errors that can occur while parsing a connection string or composing URLs. +#[derive(Debug, Clone)] +pub enum ConnectionStringError { + InvalidFormat(&'static str), + InvalidPair(String), + MissingAccountName, + MissingEndpoint, +} + +impl std::fmt::Display for ConnectionStringError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ConnectionStringError::InvalidFormat(msg) => write!(f, "invalid format: {msg}"), + ConnectionStringError::InvalidPair(p) => write!(f, "invalid key=value pair: {p}"), + ConnectionStringError::MissingAccountName => write!(f, "account name is required"), + ConnectionStringError::MissingEndpoint => { + write!(f, "could not determine Blob endpoint") + } + } + } +} + +impl std::error::Error for ConnectionStringError {} + +/// Represents the type of authentication present in the connection string. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Auth { + /// Shared key-based authentication (account key). + SharedKey { + account_name: String, + account_key: String, // base64-encoded account key as provided + }, + /// Shared access signature provided as query string (without the leading `?`). + Sas { query: String }, + /// No credentials present. + None, +} + +/// A parsed Azure Storage connection string and helpers to compose URLs for containers/blobs. +#[derive(Debug, Clone)] +pub struct ParsedConnectionString { + pub account_name: Option, + pub account_key: Option, + pub shared_access_signature: Option, + pub default_endpoints_protocol: Option, + pub endpoint_suffix: Option, + pub blob_endpoint: Option, + pub use_development_storage: bool, + pub development_storage_proxy_uri: Option, +} + +impl Default for ParsedConnectionString { + fn default() -> Self { + Self { + account_name: None, + account_key: None, + shared_access_signature: None, + default_endpoints_protocol: None, + endpoint_suffix: None, + blob_endpoint: None, + use_development_storage: false, + development_storage_proxy_uri: None, + } + } +} + +impl ParsedConnectionString { + /// Parse a connection string into a `ParsedConnectionString`. + /// + /// The parser is case-insensitive for keys and ignores empty segments. + pub fn parse(s: &str) -> Result { + let mut map: HashMap = HashMap::new(); + + for seg in s.split(';') { + let seg = seg.trim(); + if seg.is_empty() { + continue; + } + let (k, v) = seg + .split_once('=') + .ok_or_else(|| ConnectionStringError::InvalidPair(seg.to_string()))?; + let key = k.trim().to_ascii_lowercase(); + let value = v.trim().to_string(); + map.insert(key, value); + } + + // Build the structure from the parsed map. + let mut parsed = ParsedConnectionString::default(); + + parsed.account_name = map.get("accountname").cloned(); + parsed.account_key = map.get("accountkey").cloned(); + parsed.shared_access_signature = map + .get("sharedaccesssignature") + .map(|s| normalize_sas(s.as_str())); + + parsed.default_endpoints_protocol = map + .get("defaultendpointsprotocol") + .map(|s| s.to_ascii_lowercase()); + + parsed.endpoint_suffix = map.get("endpointsuffix").cloned(); + + parsed.blob_endpoint = map.get("blobendpoint").cloned(); + + parsed.use_development_storage = map + .get("usedevelopmentstorage") + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + parsed.development_storage_proxy_uri = map.get("developmentstorageproxyuri").cloned(); + + Ok(parsed) + } + + /// Determine the authentication method present in this connection string. + pub fn auth(&self) -> Auth { + if let (Some(name), Some(key)) = (self.account_name.as_ref(), self.account_key.as_ref()) { + return Auth::SharedKey { + account_name: name.clone(), + account_key: key.clone(), + }; + } + if let Some(sas) = self.shared_access_signature.as_ref() { + return Auth::Sas { query: sas.clone() }; + } + Auth::None + } + + /// Get the normalized default protocol, defaulting to: + /// - http for development storage + /// - https otherwise + pub fn default_protocol(&self) -> String { + if let Some(p) = self.default_endpoints_protocol.as_deref() { + match p { + "http" | "https" => p.to_string(), + _ => { + // Fallbacks + if self.use_development_storage { + "http".to_string() + } else { + "https".to_string() + } + } + } + } else if self.use_development_storage { + "http".to_string() + } else { + "https".to_string() + } + } + + /// Get the normalized endpoint suffix, defaulting to "core.windows.net". + pub fn endpoint_suffix(&self) -> String { + self.endpoint_suffix + .clone() + .unwrap_or_else(|| "core.windows.net".to_string()) + } + + /// Build the base Blob endpoint URL (no container/blob path). + /// + /// Resolution order: + /// 1. BlobEndpoint (as-is, without trailing slash normalization) + /// 2. Development storage synthesized URL: `{proto}://127.0.0.1:10000/{account}` + /// If DevelopmentStorageProxyUri is present, it will be used instead of 127.0.0.1:10000. + /// 3. Public cloud synthesized URL: `{proto}://{account}.blob.{suffix}` + pub fn blob_account_endpoint(&self) -> Result { + if let Some(explicit) = self.blob_endpoint.as_ref() { + return Ok(explicit.clone()); + } + + let account_name = self + .account_name + .as_ref() + .ok_or(ConnectionStringError::MissingAccountName)?; + + let proto = self.default_protocol(); + + if self.use_development_storage { + // If the proxy URI is provided, use it. Otherwise default to 127.0.0.1:10000 + let host = self + .development_storage_proxy_uri + .as_deref() + .map(|s| s.trim_end_matches('/').to_string()) + .unwrap_or_else(|| "127.0.0.1:10000".to_string()); + + let base = if host.starts_with("http://") || host.starts_with("https://") { + format!("{}/{}", trim_trailing_slash(&host), account_name) + } else { + format!("{proto}://{host}/{}", account_name) + }; + return Ok(base); + } + + // Public cloud-style base + let suffix = self.endpoint_suffix(); + Ok(format!("{proto}://{}.blob.{}", account_name, suffix)) + } + + /// Build a container URL, optionally appending SAS if present. + pub fn container_url(&self, container: &str) -> Result { + let base = self.blob_account_endpoint()?; + Ok(append_query_segment( + &format!("{}/{}", trim_trailing_slash(&base), container), + self.shared_access_signature.as_deref(), + )) + } + + /// Build a blob URL, optionally appending SAS if present. + pub fn blob_url(&self, container: &str, blob: &str) -> Result { + let container_url = self.container_url(container)?; + Ok(append_query_segment( + &format!( + "{}/{}", + trim_trailing_slash(&container_url), + encode_path_segment(blob) + ), + // container_url already handled SAS; if it already had query args, append with '&' + None, // SAS already appended at container level if present + )) + } +} + +/// Normalize a SAS string by removing any leading '?'. +fn normalize_sas(s: &str) -> String { + s.trim_start_matches('?').to_string() +} + +/// Append a query segment `sas` to `base_url`, respecting whether `base_url` already has a query. +fn append_query_segment(base_url: &str, sas: Option<&str>) -> String { + match sas { + None => base_url.to_string(), + Some(q) if q.is_empty() => base_url.to_string(), + Some(q) => { + let sep = if base_url.contains('?') { '&' } else { '?' }; + format!("{base_url}{sep}{q}") + } + } +} + +/// Trim exactly one trailing slash from a string, if present. +fn trim_trailing_slash(s: &str) -> String { + if s.ends_with('/') { + s[..s.len() - 1].to_string() + } else { + s.to_string() + } +} + +/// Encode a path segment minimally (only slash needs special handling for our cases). +/// For our purposes (blob names generated by Vector), we only replace spaces with %20. +/// This avoids pulling an extra crate; refine if needed in the future. +fn encode_path_segment(seg: &str) -> String { + seg.replace(' ', "%20") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_access_key_public_cloud() { + let cs = "DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64==;EndpointSuffix=core.windows.net"; + let parsed = ParsedConnectionString::parse(cs).unwrap(); + assert_eq!(parsed.account_name.as_deref(), Some("myacct")); + assert_eq!(parsed.account_key.as_deref(), Some("base64==")); + assert!(parsed.shared_access_signature.is_none()); + assert_eq!(parsed.default_protocol(), "https"); + assert_eq!(parsed.endpoint_suffix(), "core.windows.net"); + + let base = parsed.blob_account_endpoint().unwrap(); + assert_eq!(base, "https://myacct.blob.core.windows.net"); + + let container_url = parsed.container_url("logs").unwrap(); + assert_eq!(container_url, "https://myacct.blob.core.windows.net/logs"); + + let blob_url = parsed.blob_url("logs", "file.txt").unwrap(); + assert_eq!( + blob_url, + "https://myacct.blob.core.windows.net/logs/file.txt" + ); + assert_eq!( + parsed.auth(), + Auth::SharedKey { + account_name: "myacct".to_string(), + account_key: "base64==".to_string() + } + ); + } + + #[test] + fn parse_sas_with_blob_endpoint() { + let cs = "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."; + let parsed = ParsedConnectionString::parse(cs).unwrap(); + assert_eq!( + parsed.shared_access_signature.as_deref(), + Some("sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=...") + ); + + let container_url = parsed.container_url("logs").unwrap(); + assert_eq!( + container_url, + "https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..." + ); + + let blob_url = parsed.blob_url("logs", "file name.txt").unwrap(); + assert_eq!( + blob_url, + "https://myacct.blob.core.windows.net/logs/file%20name.txt?sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..." + ); + assert_eq!( + parsed.auth(), + Auth::Sas { + query: "sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..." + .to_string() + } + ); + } + + #[test] + fn parse_sas_with_leading_question_mark() { + let cs = "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=?sv=2022-11-02&ss=b"; + let parsed = ParsedConnectionString::parse(cs).unwrap(); + assert_eq!( + parsed.shared_access_signature.as_deref(), + Some("sv=2022-11-02&ss=b") + ); + let url = parsed.container_url("logs").unwrap(); + assert_eq!( + url, + "https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b" + ); + } + + #[test] + fn parse_development_storage_with_defaults() { + let cs = + "UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1"; + let parsed = ParsedConnectionString::parse(cs).unwrap(); + let base = parsed.blob_account_endpoint().unwrap(); + assert_eq!(base, "http://127.0.0.1:10000/devstoreaccount1"); + + let container_url = parsed.container_url("logs").unwrap(); + assert_eq!( + container_url, + "http://127.0.0.1:10000/devstoreaccount1/logs" + ); + } + + #[test] + fn parse_development_storage_with_proxy() { + let cs = "UseDevelopmentStorage=true;AccountName=devstoreaccount1;DevelopmentStorageProxyUri=http://localhost:10000"; + let parsed = ParsedConnectionString::parse(cs).unwrap(); + let base = parsed.blob_account_endpoint().unwrap(); + assert_eq!(base, "http://localhost:10000/devstoreaccount1"); + + let container_url = parsed.container_url("logs").unwrap(); + assert_eq!( + container_url, + "http://localhost:10000/devstoreaccount1/logs" + ); + } + + #[test] + fn parse_invalid_pairs() { + let cs = "AccountName;AccountKey=noequals"; + let err = ParsedConnectionString::parse(cs).unwrap_err(); + match err { + ConnectionStringError::InvalidPair(p) => { + assert!(p == "AccountName" || p == "AccountKey=noequals") + } + _ => panic!("unexpected error: {err}"), + } + } +} diff --git a/src/sinks/azure_common/mod.rs b/src/sinks/azure_common/mod.rs index 4d1c931977f03..368c88164fd8d 100644 --- a/src/sinks/azure_common/mod.rs +++ b/src/sinks/azure_common/mod.rs @@ -1,3 +1,5 @@ pub mod config; +pub mod connection_string; pub mod service; +pub mod shared_key_policy; pub mod sink; diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index d78e39fc31837..dca2ab0c20953 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -4,7 +4,8 @@ use std::{ task::{Context, Poll}, }; -use azure_storage_blobs::prelude::*; +use azure_core::http::RequestContent; +use azure_storage_blob::{BlobContainerClient, models::BlockBlobClientUploadOptions}; use futures::future::BoxFuture; use tower::Service; use tracing::Instrument; @@ -13,11 +14,12 @@ use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse}; #[derive(Clone)] pub struct AzureBlobService { - client: Arc, + // Using the new azure_storage_blob container client. + client: Arc, } impl AzureBlobService { - pub const fn new(client: Arc) -> AzureBlobService { + pub const fn new(client: Arc) -> AzureBlobService { AzureBlobService { client } } } @@ -37,26 +39,29 @@ impl Service for AzureBlobService { let this = self.clone(); Box::pin(async move { - let client = this + let blob_client = this .client .blob_client(request.metadata.partition_key.as_str()); let byte_size = request.blob_data.len(); - let blob = client - .put_block_blob(request.blob_data) - .content_type(request.content_type); - let blob = match request.content_encoding { - Some(encoding) => blob.content_encoding(encoding), - None => blob, - }; + let mut upload_options = BlockBlobClientUploadOptions::default(); + upload_options.blob_content_type = Some(request.content_type.to_string()); + if let Some(encoding) = request.content_encoding { + upload_options.blob_content_encoding = Some(encoding.to_string()); + } - let result = blob - .into_future() + let result = blob_client + .upload( + RequestContent::from(request.blob_data.to_vec()), + false, + byte_size as u64, + Some(upload_options), + ) .instrument(info_span!("request").or_current()) .await .map_err(|err| err.into()); - result.map(|inner| AzureBlobResponse { - inner, + result.map(|_resp| AzureBlobResponse { + inner: (), events_byte_size: request .request_metadata .into_events_estimated_json_encoded_byte_size(), diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs new file mode 100644 index 0000000000000..886026cc5b27f --- /dev/null +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -0,0 +1,217 @@ +use std::{collections::BTreeMap, fmt::Write as _, sync::Arc}; + +use async_trait::async_trait; +use azure_core::http::policies::{Policy, PolicyResult}; +use azure_core::http::{Context, Request, Url}; +use azure_core::{ + Result as AzureResult, base64, + error::Error as AzureError, + time::{OffsetDateTime, to_rfc7231}, +}; + +use openssl::{hash::MessageDigest, pkey::PKey, sign::Signer}; + +/// Shared Key authorization policy for Azure Blob Storage requests. +/// +/// This policy injects the required headers (x-ms-date, x-ms-version) if missing and +/// adds the `Authorization: SharedKey {account}:{signature}` header. The signature +/// is computed according to the "Authorize with Shared Key" rules for the Blob service: +/// +/// StringToSign = +/// VERB + "\n" + +/// Content-Encoding + "\n" + +/// Content-Language + "\n" + +/// Content-Length + "\n" + +/// Content-MD5 + "\n" + +/// Content-Type + "\n" + +/// Date + "\n" + +/// If-Modified-Since + "\n" + +/// If-Match + "\n" + +/// If-None-Match + "\n" + +/// If-Unmodified-Since + "\n" + +/// Range + "\n" + +/// CanonicalizedHeaders + +/// CanonicalizedResource +/// +/// Notes: +/// - We set x-ms-date, leaving the standard Date field empty in the signature. +/// - If Content-Length header is present with "0", the canonicalized value must be the empty string. +/// - Canonicalized headers include all x-ms-* headers (lowercased, sorted). +/// - Canonicalized resource is "/{account}{path}\n" + sorted lowercase query params. +/// +#[derive(Debug)] +pub struct SharedKeyAuthorizationPolicy { + account_name: String, + account_key: Vec, // decoded from base64 + storage_version: String, +} + +impl SharedKeyAuthorizationPolicy { + /// Create a new shared key policy. + /// + /// - `account_name`: The storage account name. + /// - `account_key_b64`: Base64-encoded storage account key. + /// - `storage_version`: x-ms-version value to send (e.g. "2025-11-05"). + pub fn new( + account_name: String, + account_key_b64: String, + storage_version: String, + ) -> AzureResult { + let account_key = base64::decode(account_key_b64.as_bytes()).map_err(|e| { + AzureError::with_message( + azure_core::error::ErrorKind::Other, + format!("invalid account key base64: {e}"), + ) + })?; + Ok(Self { + account_name, + account_key, + storage_version, + }) + } + + fn ensure_ms_headers(&self, request: &mut Request) -> AzureResult<(String, String)> { + let now = OffsetDateTime::now_utc(); + let ms_date = to_rfc7231(&now); + // Insert owned values to avoid borrowing issues + request.insert_header("x-ms-date", ms_date.clone()); + request.insert_header("x-ms-version", self.storage_version.clone()); + Ok((ms_date, self.storage_version.clone())) + } + + fn build_string_to_sign( + &self, + req: &Request, + ms_date: &str, + ms_version: &str, + ) -> AzureResult { + let method = req.method().as_str(); + let url = req.url(); + + let mut s = String::with_capacity(512); + + // VERB + s.push_str(method); + s.push('\n'); + + // Content-Encoding + s.push('\n'); + // Content-Language + s.push('\n'); + // Content-Length + s.push('\n'); + // Content-MD5 + s.push('\n'); + // Content-Type + s.push('\n'); + // Date (unused when x-ms-date is used) + s.push('\n'); + // If-Modified-Since + s.push('\n'); + // If-Match + s.push('\n'); + // If-None-Match + s.push('\n'); + // If-Unmodified-Since + s.push('\n'); + // Range + s.push('\n'); + + // CanonicalizedHeaders (only those we know we set) + s.push_str("x-ms-date:"); + s.push_str(ms_date); + s.push('\n'); + s.push_str("x-ms-version:"); + s.push_str(ms_version); + s.push('\n'); + + // CanonicalizedResource + append_canonicalized_resource(&mut s, &self.account_name, url)?; + + Ok(s) + } + + fn sign(&self, string_to_sign: &str) -> AzureResult { + let pkey = PKey::hmac(&self.account_key).map_err(|e| { + AzureError::with_message( + azure_core::error::ErrorKind::Other, + format!("failed to create HMAC key: {e}"), + ) + })?; + let mut signer = Signer::new(MessageDigest::sha256(), &pkey).map_err(|e| { + AzureError::with_message( + azure_core::error::ErrorKind::Other, + format!("failed to create signer: {e}"), + ) + })?; + signer.update(string_to_sign.as_bytes()).map_err(|e| { + AzureError::with_message( + azure_core::error::ErrorKind::Other, + format!("signer update failed: {e}"), + ) + })?; + let mac = signer.sign_to_vec().map_err(|e| { + AzureError::with_message( + azure_core::error::ErrorKind::Other, + format!("signer sign failed: {e}"), + ) + })?; + Ok(base64::encode(&mac)) + } +} + +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +impl Policy for SharedKeyAuthorizationPolicy { + async fn send( + &self, + ctx: &Context, + request: &mut Request, + next: &[Arc], + ) -> PolicyResult { + // Ensure required x-ms headers are present + let (ms_date, ms_version) = self.ensure_ms_headers(request)?; + // Build string to sign + let sts = self.build_string_to_sign(request, &ms_date, &ms_version)?; + let signature = self.sign(&sts)?; + + // Authorization: SharedKey {account}:{signature} + request.insert_header( + "authorization", + format!("SharedKey {}:{}", self.account_name, signature), + ); + + // Continue pipeline + next[0].send(ctx, request, &next[1..]).await + } +} + +// ---------- Helpers ---------- + +fn append_canonicalized_resource(s: &mut String, account: &str, url: &Url) -> AzureResult<()> { + // "/{account_name}{path}\n" + s.push('/'); + s.push_str(account); + // Path is percent-decoded by Url, but we use it as-is (leading slash included). + s.push_str(url.path()); + + // Canonicalized query: lowercase names, sort by name, join multi-values by comma, each line "name:value\n" + if let Some(_) = url.query() { + let mut qp_map: BTreeMap> = BTreeMap::new(); + for (name, value) in url.query_pairs() { + let key_l = name.to_ascii_lowercase(); + qp_map.entry(key_l).or_default().push(value.to_string()); + } + for (k, mut vals) in qp_map { + // Sort values (optional but deterministic) + vals.sort(); + let mut line = String::new(); + let _ = write!(&mut line, "\n{}:", k); + let joined = vals.join(","); + line.push_str(&joined); + s.push_str(&line); + } + } + + Ok(()) +} From 62136c401d7000bf9a0898e590c9ba4ad4c968b2 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Mon, 24 Nov 2025 22:43:21 -0600 Subject: [PATCH 02/13] add a reference comment for empty headers --- src/sinks/azure_common/shared_key_policy.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs index 886026cc5b27f..e84388d64b771 100644 --- a/src/sinks/azure_common/shared_key_policy.rs +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -94,6 +94,9 @@ impl SharedKeyAuthorizationPolicy { s.push_str(method); s.push('\n'); + // Newline characters for empty headers + // https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key + // Content-Encoding s.push('\n'); // Content-Language From 148419260c75f067ca7aa9ad18712597d5fb90c2 Mon Sep 17 00:00:00 2001 From: Josh Coughlan Date: Tue, 18 Nov 2025 14:04:55 -0600 Subject: [PATCH 03/13] Update Cargo.toml Co-authored-by: Thomas --- Cargo.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9e6c8fe426d71..7d6b75cadb49f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -461,8 +461,6 @@ openssl-src = { version = "300", default-features = false, features = ["force-en approx = "0.5.1" assert_cmd = { version = "2.0.17", default-features = false } aws-smithy-runtime = { version = "1.8.3", default-features = false, features = ["tls-rustls"] } -azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] } -azure_identity = { version = "0.30", default-features = false } base64 = "0.22.1" criterion = { version = "0.7.0", features = ["html_reports", "async_tokio"] } itertools.workspace = true From 7785983170612e05aa05c183c6dc935bc397dc06 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Wed, 7 Jan 2026 14:28:00 -0600 Subject: [PATCH 04/13] Fix failing tests --- LICENSE-3rdparty.csv | 14 +--- src/sinks/azure_blob/integration_tests.rs | 87 ++++++++++--------- src/sinks/azure_common/connection_string.rs | 93 +++++++++------------ src/sinks/azure_common/service.rs | 10 +-- src/sinks/azure_common/shared_key_policy.rs | 2 +- 5 files changed, 90 insertions(+), 116 deletions(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index b507953d3272b..9e723f89a93e8 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -1,6 +1,5 @@ Component,Origin,License,Copyright Inflector,https://github.com/whatisinternet/inflector,BSD-2-Clause,Josh Teeter -RustyXML,https://github.com/Florob/RustyXML,MIT OR Apache-2.0,Florian Zeitz addr2line,https://github.com/gimli-rs/addr2line,Apache-2.0 OR MIT,The addr2line Authors adler2,https://github.com/oyvindln/adler2,0BSD OR MIT OR Apache-2.0,"Jonas Schievink , oyvindln " adler32,https://github.com/remram44/adler32-rs,Zlib,Remi Rampin @@ -104,11 +103,8 @@ aws-types,https://github.com/smithy-lang/smithy-rs,Apache-2.0,"AWS Rust SDK Team axum,https://github.com/tokio-rs/axum,MIT,The axum Authors axum-core,https://github.com/tokio-rs/axum,MIT,The axum-core Authors azure_core,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft -azure_core,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp. -azure_identity,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft -azure_storage,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp. -azure_storage_blobs,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft Corp. -azure_svc_blobstorage,https://github.com/azure/azure-sdk-for-rust,MIT,The azure_svc_blobstorage Authors +azure_core_macros,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft +azure_storage_blob,https://github.com/azure/azure-sdk-for-rust,MIT,Microsoft backoff,https://github.com/ihrwein/backoff,MIT OR Apache-2.0,Tibor Benke backon,https://github.com/Xuanwo/backon,Apache-2.0,The backon Authors backtrace,https://github.com/rust-lang/backtrace-rs,MIT OR Apache-2.0,The Rust Project Developers @@ -358,7 +354,6 @@ http-body,https://github.com/hyperium/http-body,MIT,"Carl Lerche , Lucio Franco , Sean McArthur " http-range-header,https://github.com/MarcusGrass/parse-range-headers,MIT,The http-range-header Authors http-serde,https://gitlab.com/kornelski/http-serde,Apache-2.0 OR MIT,Kornel -http-types,https://github.com/http-rs/http-types,MIT OR Apache-2.0,Yoshua Wuyts httparse,https://github.com/seanmonstar/httparse,MIT OR Apache-2.0,Sean McArthur httpdate,https://github.com/pyfisch/httpdate,MIT OR Apache-2.0,Pyfisch humantime,https://github.com/chronotope/humantime,MIT OR Apache-2.0,The humantime Authors @@ -389,7 +384,6 @@ idna_adapter,https://github.com/hsivonen/idna_adapter,Apache-2.0 OR MIT,The rust indexmap,https://github.com/bluss/indexmap,Apache-2.0 OR MIT,The indexmap Authors indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap Authors indoc,https://github.com/dtolnay/indoc,MIT OR Apache-2.0,David Tolnay -infer,https://github.com/bojand/infer,MIT,Bojan influxdb-line-protocol,https://github.com/influxdata/influxdb_iox/tree/main/influxdb_line_protocol,MIT OR Apache-2.0,InfluxDB IOx Project Developers inotify,https://github.com/hannobraun/inotify,ISC,"Hanno Braun , Félix Saparelli , Cristian Kubis , Frank Denis " inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun @@ -404,6 +398,7 @@ ipconfig,https://github.com/liranringel/ipconfig,MIT OR Apache-2.0,Liran Ringel ipcrypt-rs,https://github.com/jedisct1/rust-ipcrypt2,ISC,Frank Denis ipnet,https://github.com/krisprice/ipnet,MIT OR Apache-2.0,Kris Price ipnetwork,https://github.com/achanda/ipnetwork,MIT OR Apache-2.0,"Abhishek Chanda , Linus Färnstrand " +iri-string,https://github.com/lo48576/iri-string,MIT OR Apache-2.0,YOSHIOKA Takuma is-terminal,https://github.com/sunfishcode/is-terminal,MIT,"softprops , Dan Gohman " is_ci,https://github.com/zkat/is_ci,ISC,Kat Marchán itertools,https://github.com/rust-itertools/itertools,MIT OR Apache-2.0,bluss @@ -621,7 +616,6 @@ rand,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Dev rand_chacha,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers, The CryptoCorrosion Contributors" rand_core,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers" rand_distr,https://github.com/rust-random/rand_distr,MIT OR Apache-2.0,The Rand Project Developers -rand_hc,https://github.com/rust-random/rand,MIT OR Apache-2.0,The Rand Project Developers rand_xorshift,https://github.com/rust-random/rngs,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers" ratatui,https://github.com/ratatui/ratatui,MIT,"Florian Dehau , The Ratatui Developers" raw-cpuid,https://github.com/gz/rust-cpuid,MIT,Gerd Zellweger @@ -700,7 +694,6 @@ serde_json,https://github.com/serde-rs/json,MIT OR Apache-2.0,"Erick Tryzelaar < serde_nanos,https://github.com/caspervonb/serde_nanos,MIT OR Apache-2.0,Casper Beyer serde_path_to_error,https://github.com/dtolnay/path-to-error,MIT OR Apache-2.0,David Tolnay serde_plain,https://github.com/mitsuhiko/serde-plain,MIT OR Apache-2.0,Armin Ronacher -serde_qs,https://github.com/samscott89/serde_qs,MIT OR Apache-2.0,Sam Scott serde_repr,https://github.com/dtolnay/serde-repr,MIT OR Apache-2.0,David Tolnay serde_spanned,https://github.com/toml-rs/toml,MIT OR Apache-2.0,The serde_spanned Authors serde_urlencoded,https://github.com/nox/serde_urlencoded,MIT OR Apache-2.0,Anthony Ramine @@ -876,6 +869,7 @@ wasm-timer,https://github.com/tomaka/wasm-timer,MIT,Pierre Krieger diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index 1ffa70976a697..ca2969913e049 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -1,11 +1,7 @@ -use std::{ - io::{BufRead, BufReader}, - num::NonZeroU32, -}; +use std::io::{BufRead, BufReader}; use azure_core::http::StatusCode; -use azure_storage_blob::BlobContainerClient; use bytes::{Buf, BytesMut}; use flate2::read::GzDecoder; use futures::{Stream, StreamExt, stream}; @@ -83,7 +79,7 @@ async fn azure_blob_insert_lines_into_blob() { let blobs = config.list_blobs(blob_prefix).await; assert_eq!(blobs.len(), 1); assert!(blobs[0].clone().ends_with(".log")); - let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + let (content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; assert_eq!(content_type, Some(String::from("text/plain"))); assert_eq!(lines, blob_lines); } @@ -138,9 +134,9 @@ async fn azure_blob_insert_lines_into_blob_gzip() { let blobs = config.list_blobs(blob_prefix).await; assert_eq!(blobs.len(), 1); assert!(blobs[0].clone().ends_with(".log.gz")); - let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await; - assert_eq!(blob.properties.content_encoding, Some(String::from("gzip"))); - assert_eq!(blob.properties.content_type, String::from("text/plain")); + let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!(content_encoding, Some(String::from("gzip"))); + assert_eq!(content_type, Some(String::from("text/plain"))); assert_eq!(lines, blob_lines); } @@ -168,12 +164,9 @@ async fn azure_blob_insert_json_into_blob_gzip() { let blobs = config.list_blobs(blob_prefix).await; assert_eq!(blobs.len(), 1); assert!(blobs[0].clone().ends_with(".log.gz")); - let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await; - assert_eq!(blob.properties.content_encoding, Some(String::from("gzip"))); - assert_eq!( - blob.properties.content_type, - String::from("application/x-ndjson") - ); + let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + assert_eq!(content_encoding, Some(String::from("gzip"))); + assert_eq!(content_type, Some(String::from("application/x-ndjson"))); let expected = events .iter() .map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap()) @@ -204,7 +197,7 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached() { assert_eq!(blobs.len(), 3); let response = stream::iter(blobs) .fold(Vec::new(), |mut acc, blob| async { - let (_, lines) = config.get_blob(blob).await; + let (_, _, lines) = config.get_blob(blob).await; acc.push(lines); acc }) @@ -260,32 +253,24 @@ impl AzureBlobSinkConfig { ) .unwrap(); - // Use new SDK pager to fetch first page and collect blob names. + // Iterate pager results and collect blob names. Filter by prefix server-side. let mut pager = client .list_blobs(None) .expect("Failed to start list blobs pager"); - let page = pager - .next() - .await - .expect("Failed to fetch blobs") - .into_body(); - - // Best-effort extraction of names from the page body. - // Depending on SDK struct names, this may need tweaking: - // ListBlobsFlatSegmentResponse { segment: { blob_items: [{ name, .. }, ..] }, .. } - let names = page - .segment - .blob_items - .into_iter() - .map(|b| b.name) - .filter(|name| name.starts_with(&prefix)) - .collect::>(); + let mut names = Vec::new(); + while let Some(result) = pager.next().await { + let item = result.expect("Failed to fetch blobs"); + if let Some(name) = item.name.and_then(|bn| bn.content) + && name.starts_with(&prefix) + { + names.push(name); + } + } names } pub async fn get_blob(&self, blob: String) -> (Option, Option, Vec) { - use azure_storage_blob::clients::BlobClient as _; let client = azure_common::config::build_client( self.connection_string.clone().into(), self.container_name.clone(), @@ -295,27 +280,39 @@ impl AzureBlobSinkConfig { let blob_client = client.blob_client(&blob); // Fetch properties to obtain content-type and content-encoding - let props = blob_client + let props_resp = blob_client .get_properties(None) .await - .expect("Failed to get blob properties") - .into_body(); - - let content_type = props.content_type.clone(); - let content_encoding = props.content_encoding.clone(); + .expect("Failed to get blob properties"); + let headers = props_resp.headers(); + let content_type = headers.iter().find_map(|(name, value)| { + let key = name.as_str(); + if key.eq_ignore_ascii_case("content-type") { + Some(value.as_str().to_string()) + } else { + None + } + }); + let content_encoding = headers.iter().find_map(|(name, value)| { + let key = name.as_str(); + if key.eq_ignore_ascii_case("content-encoding") { + Some(value.as_str().to_string()) + } else { + None + } + }); // Download blob content (full or first MB as needed) let downloaded = blob_client .download(None) .await .expect("Failed to download blob"); - let data = downloaded + let body_bytes = downloaded .into_body() - .data .collect() .await - .expect("Failed to read blob body") - .to_vec(); + .expect("Failed to read blob body"); + let data = body_bytes.to_vec(); (content_type, content_encoding, self.get_blob_content(data)) } @@ -344,7 +341,7 @@ impl AzureBlobSinkConfig { let response = match result { Ok(_) => Ok(()), Err(error) => match error.http_status() { - Some(status) if status.as_u16() == StatusCode::Conflict => Ok(()), + Some(StatusCode::Conflict) => Ok(()), _ => Err(error), }, }; diff --git a/src/sinks/azure_common/connection_string.rs b/src/sinks/azure_common/connection_string.rs index 990dcd99d6015..391f9d196a9dd 100644 --- a/src/sinks/azure_common/connection_string.rs +++ b/src/sinks/azure_common/connection_string.rs @@ -33,17 +33,17 @@ SAS handling Examples: - Access key connection string: "DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64key==;EndpointSuffix=core.windows.net" - Container URL: https://myacct.blob.core.windows.net/logs - Blob URL: https://myacct.blob.core.windows.net/logs/file.txt + Container URL: + Blob URL: - SAS connection string: - "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=b&..." - Container URL (with SAS): https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&... - Blob URL (with SAS): https://myacct.blob.core.windows.net/logs/file.txt?sv=2022-11-02&ss=b&... + "BlobEndpoint=;SharedAccessSignature=sv=2022-11-02&ss=b&..." + Container URL (with SAS): + Blob URL (with SAS): - Azurite/dev storage: "UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1" - Container URL: http://127.0.0.1:10000/devstoreaccount1/logs + Container URL: */ use std::collections::HashMap; @@ -87,7 +87,7 @@ pub enum Auth { } /// A parsed Azure Storage connection string and helpers to compose URLs for containers/blobs. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ParsedConnectionString { pub account_name: Option, pub account_key: Option, @@ -99,21 +99,6 @@ pub struct ParsedConnectionString { pub development_storage_proxy_uri: Option, } -impl Default for ParsedConnectionString { - fn default() -> Self { - Self { - account_name: None, - account_key: None, - shared_access_signature: None, - default_endpoints_protocol: None, - endpoint_suffix: None, - blob_endpoint: None, - use_development_storage: false, - development_storage_proxy_uri: None, - } - } -} - impl ParsedConnectionString { /// Parse a connection string into a `ParsedConnectionString`. /// @@ -135,28 +120,23 @@ impl ParsedConnectionString { } // Build the structure from the parsed map. - let mut parsed = ParsedConnectionString::default(); - - parsed.account_name = map.get("accountname").cloned(); - parsed.account_key = map.get("accountkey").cloned(); - parsed.shared_access_signature = map - .get("sharedaccesssignature") - .map(|s| normalize_sas(s.as_str())); - - parsed.default_endpoints_protocol = map - .get("defaultendpointsprotocol") - .map(|s| s.to_ascii_lowercase()); - - parsed.endpoint_suffix = map.get("endpointsuffix").cloned(); - - parsed.blob_endpoint = map.get("blobendpoint").cloned(); - - parsed.use_development_storage = map - .get("usedevelopmentstorage") - .map(|v| v.eq_ignore_ascii_case("true")) - .unwrap_or(false); - - parsed.development_storage_proxy_uri = map.get("developmentstorageproxyuri").cloned(); + let parsed = ParsedConnectionString { + account_name: map.get("accountname").cloned(), + account_key: map.get("accountkey").cloned(), + shared_access_signature: map + .get("sharedaccesssignature") + .map(|s| normalize_sas(s.as_str())), + default_endpoints_protocol: map + .get("defaultendpointsprotocol") + .map(|s| s.to_ascii_lowercase()), + endpoint_suffix: map.get("endpointsuffix").cloned(), + blob_endpoint: map.get("blobendpoint").cloned(), + use_development_storage: map + .get("usedevelopmentstorage") + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false), + development_storage_proxy_uri: map.get("developmentstorageproxyuri").cloned(), + }; Ok(parsed) } @@ -256,15 +236,18 @@ impl ParsedConnectionString { /// Build a blob URL, optionally appending SAS if present. pub fn blob_url(&self, container: &str, blob: &str) -> Result { - let container_url = self.container_url(container)?; + // Build the base container URL without SAS, then append the blob path, + // and finally append the SAS so it appears after the full path. + let base = self.blob_account_endpoint()?; + let container_no_sas = format!("{}/{}", trim_trailing_slash(&base), container); + let blob_full = format!( + "{}/{}", + trim_trailing_slash(&container_no_sas), + encode_path_segment(blob) + ); Ok(append_query_segment( - &format!( - "{}/{}", - trim_trailing_slash(&container_url), - encode_path_segment(blob) - ), - // container_url already handled SAS; if it already had query args, append with '&' - None, // SAS already appended at container level if present + &blob_full, + self.shared_access_signature.as_deref(), )) } } @@ -278,7 +261,7 @@ fn normalize_sas(s: &str) -> String { fn append_query_segment(base_url: &str, sas: Option<&str>) -> String { match sas { None => base_url.to_string(), - Some(q) if q.is_empty() => base_url.to_string(), + Some("") => base_url.to_string(), Some(q) => { let sep = if base_url.contains('?') { '&' } else { '?' }; format!("{base_url}{sep}{q}") @@ -288,8 +271,8 @@ fn append_query_segment(base_url: &str, sas: Option<&str>) -> String { /// Trim exactly one trailing slash from a string, if present. fn trim_trailing_slash(s: &str) -> String { - if s.ends_with('/') { - s[..s.len() - 1].to_string() + if let Some(stripped) = s.strip_suffix('/') { + stripped.to_string() } else { s.to_string() } diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index dca2ab0c20953..3e8e409179a23 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -43,11 +43,11 @@ impl Service for AzureBlobService { .client .blob_client(request.metadata.partition_key.as_str()); let byte_size = request.blob_data.len(); - let mut upload_options = BlockBlobClientUploadOptions::default(); - upload_options.blob_content_type = Some(request.content_type.to_string()); - if let Some(encoding) = request.content_encoding { - upload_options.blob_content_encoding = Some(encoding.to_string()); - } + let upload_options = BlockBlobClientUploadOptions { + blob_content_type: Some(request.content_type.to_string()), + blob_content_encoding: request.content_encoding.map(|e| e.to_string()), + ..Default::default() + }; let result = blob_client .upload( diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs index e84388d64b771..0a38d359a5724 100644 --- a/src/sinks/azure_common/shared_key_policy.rs +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -199,7 +199,7 @@ fn append_canonicalized_resource(s: &mut String, account: &str, url: &Url) -> Az s.push_str(url.path()); // Canonicalized query: lowercase names, sort by name, join multi-values by comma, each line "name:value\n" - if let Some(_) = url.query() { + if url.query().is_some() { let mut qp_map: BTreeMap> = BTreeMap::new(); for (name, value) in url.query_pairs() { let key_l = name.to_ascii_lowercase(); From 74c0515284d8af7cce040e12bf574fa3db55d7d8 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Wed, 14 Jan 2026 14:25:16 -0600 Subject: [PATCH 05/13] Add optional = true to azure_core --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7d6b75cadb49f..fcf3c09fc54a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -288,7 +288,7 @@ aws-smithy-runtime-api = { version = "1.7.3", default-features = false, optional aws-smithy-types = { version = "1.2.11", default-features = false, features = ["rt-tokio"], optional = true } # Azure -azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"] } +azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"], optional = true } azure_identity = { version = "0.30", default-features = false, optional = true } # Azure Storage From 94af583dc2101cc2d709ae45b3a3835c5dafbe6f Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Wed, 14 Jan 2026 14:27:04 -0600 Subject: [PATCH 06/13] Add dep:azure_core to sinks-azure_blob --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fcf3c09fc54a1..0b0e5ae29ff99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -846,7 +846,7 @@ sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"] sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-http"] -sinks-azure_blob = ["dep:azure_storage_blob"] +sinks-azure_blob = ["dep:azure_core", "dep:azure_storage_blob"] sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] From 30d4829fc75784612eaeef2afa6eca3b23257e39 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Wed, 14 Jan 2026 17:26:32 -0600 Subject: [PATCH 07/13] more changes suggested in the PR --- src/sinks/azure_blob/integration_tests.rs | 3 ++- src/sinks/azure_common/config.rs | 1 - src/sinks/azure_common/service.rs | 1 - src/sinks/azure_common/shared_key_policy.rs | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/sinks/azure_blob/integration_tests.rs b/src/sinks/azure_blob/integration_tests.rs index ca2969913e049..00ef11f17030a 100644 --- a/src/sinks/azure_blob/integration_tests.rs +++ b/src/sinks/azure_blob/integration_tests.rs @@ -79,8 +79,9 @@ async fn azure_blob_insert_lines_into_blob() { let blobs = config.list_blobs(blob_prefix).await; assert_eq!(blobs.len(), 1); assert!(blobs[0].clone().ends_with(".log")); - let (content_type, _content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; + let (content_type, content_encoding, blob_lines) = config.get_blob(blobs[0].clone()).await; assert_eq!(content_type, Some(String::from("text/plain"))); + assert_eq!(content_encoding, None); assert_eq!(lines, blob_lines); } diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 5fe7c7ede2596..bfb4a89629ff3 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -73,7 +73,6 @@ impl RetryLogic for AzureBlobRetryLogic { #[derive(Debug)] pub struct AzureBlobResponse { - pub inner: (), pub events_byte_size: GroupedCountByteSize, pub byte_size: usize, } diff --git a/src/sinks/azure_common/service.rs b/src/sinks/azure_common/service.rs index 3e8e409179a23..1f1d322156f78 100644 --- a/src/sinks/azure_common/service.rs +++ b/src/sinks/azure_common/service.rs @@ -61,7 +61,6 @@ impl Service for AzureBlobService { .map_err(|err| err.into()); result.map(|_resp| AzureBlobResponse { - inner: (), events_byte_size: request .request_metadata .into_events_estimated_json_encoded_byte_size(), diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs index 0a38d359a5724..0d2545d13202c 100644 --- a/src/sinks/azure_common/shared_key_policy.rs +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -199,6 +199,7 @@ fn append_canonicalized_resource(s: &mut String, account: &str, url: &Url) -> Az s.push_str(url.path()); // Canonicalized query: lowercase names, sort by name, join multi-values by comma, each line "name:value\n" + // https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#shared-key-format-for-2009-09-19-and-later if url.query().is_some() { let mut qp_map: BTreeMap> = BTreeMap::new(); for (name, value) in url.query_pairs() { @@ -206,7 +207,6 @@ fn append_canonicalized_resource(s: &mut String, account: &str, url: &Url) -> Az qp_map.entry(key_l).or_default().push(value.to_string()); } for (k, mut vals) in qp_map { - // Sort values (optional but deterministic) vals.sort(); let mut line = String::new(); let _ = write!(&mut line, "\n{}:", k); From 5faac279910a0255562f2379585408869b69badb Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Fri, 16 Jan 2026 13:22:34 -0600 Subject: [PATCH 08/13] Fix shared_key_policy --- src/sinks/azure_common/config.rs | 2 +- src/sinks/azure_common/shared_key_policy.rs | 109 +++++++++++++++++--- 2 files changed, 95 insertions(+), 16 deletions(-) diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index bfb4a89629ff3..8792e7b2d51ce 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -152,7 +152,7 @@ pub fn build_client( let policy = SharedKeyAuthorizationPolicy::new( account_name, account_key, - // Use the same version as the generated client defaults + // Use an Azurite-supported storage service version String::from("2025-11-05"), ) .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?; diff --git a/src/sinks/azure_common/shared_key_policy.rs b/src/sinks/azure_common/shared_key_policy.rs index 0d2545d13202c..5724484d3c1cb 100644 --- a/src/sinks/azure_common/shared_key_policy.rs +++ b/src/sinks/azure_common/shared_key_policy.rs @@ -71,12 +71,13 @@ impl SharedKeyAuthorizationPolicy { } fn ensure_ms_headers(&self, request: &mut Request) -> AzureResult<(String, String)> { + // Always set x-ms-date and x-ms-version explicitly to known values for signing. let now = OffsetDateTime::now_utc(); let ms_date = to_rfc7231(&now); - // Insert owned values to avoid borrowing issues request.insert_header("x-ms-date", ms_date.clone()); - request.insert_header("x-ms-version", self.storage_version.clone()); - Ok((ms_date, self.storage_version.clone())) + let ms_version = self.storage_version.clone(); + request.insert_header("x-ms-version", ms_version.clone()); + Ok((ms_date, ms_version)) } fn build_string_to_sign( @@ -94,39 +95,105 @@ impl SharedKeyAuthorizationPolicy { s.push_str(method); s.push('\n'); - // Newline characters for empty headers - // https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key + // Resolve standard headers (case-insensitive) and write them in order required by the spec. + // https://learn.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#shared-key-format-for-2009-09-19-and-later + let header = |name: &str| -> Option<&str> { + for (n, v) in req.headers().iter() { + if n.as_str().eq_ignore_ascii_case(name) { + return Some(v.as_str()); + } + } + None + }; // Content-Encoding + if let Some(v) = header("Content-Encoding") { + s.push_str(v); + } s.push('\n'); + // Content-Language + if let Some(v) = header("Content-Language") { + s.push_str(v); + } s.push('\n'); - // Content-Length + + // Content-Length (include value if present; keep "0") + if let Some(v) = header("Content-Length") { + s.push_str(v); + } s.push('\n'); + // Content-MD5 + if let Some(v) = header("Content-MD5") { + s.push_str(v); + } s.push('\n'); + // Content-Type + if let Some(v) = header("Content-Type") { + s.push_str(v); + } s.push('\n'); + // Date (unused when x-ms-date is used) s.push('\n'); + // If-Modified-Since + if let Some(v) = header("If-Modified-Since") { + s.push_str(v); + } s.push('\n'); + // If-Match + if let Some(v) = header("If-Match") { + s.push_str(v); + } s.push('\n'); + // If-None-Match + if let Some(v) = header("If-None-Match") { + s.push_str(v); + } s.push('\n'); + // If-Unmodified-Since + if let Some(v) = header("If-Unmodified-Since") { + s.push_str(v); + } s.push('\n'); + // Range + if let Some(v) = header("Range") { + s.push_str(v); + } s.push('\n'); - // CanonicalizedHeaders (only those we know we set) - s.push_str("x-ms-date:"); - s.push_str(ms_date); - s.push('\n'); - s.push_str("x-ms-version:"); - s.push_str(ms_version); - s.push('\n'); + // CanonicalizedHeaders: include all x-ms-* headers, lowercased, sorted by name. + // If multiple values for the same header exist, sort values and join with commas. + let mut xms: BTreeMap> = BTreeMap::new(); + for (name, value) in req.headers().iter() { + let key = name.as_str().to_ascii_lowercase(); + if key.starts_with("x-ms-") { + xms.entry(key) + .or_default() + .push(value.as_str().trim().to_string()); + } + } + // Ensure required headers are present (they should have been inserted). + xms.entry("x-ms-date".to_string()) + .or_default() + .push(ms_date.to_string()); + xms.entry("x-ms-version".to_string()) + .or_default() + .push(ms_version.to_string()); + + for (k, mut vals) in xms { + vals.sort(); + vals.dedup(); + let joined = vals.join(","); + let _ = writeln!(s, "{}:{}", k, joined); + } // CanonicalizedResource append_canonicalized_resource(&mut s, &self.account_name, url)?; @@ -176,6 +243,14 @@ impl Policy for SharedKeyAuthorizationPolicy { let (ms_date, ms_version) = self.ensure_ms_headers(request)?; // Build string to sign let sts = self.build_string_to_sign(request, &ms_date, &ms_version)?; + // // Debug string-to-sign for troubleshooting (safe: does not include key) + // let compact = sts.replace('\n', "\\n"); + // tracing::debug!( + // method = %request.method().as_str(), + // url = %request.url(), + // string_to_sign = %compact, + // "Azure shared key string_to_sign." + // ); let signature = self.sign(&sts)?; // Authorization: SharedKey {account}:{signature} @@ -195,7 +270,7 @@ fn append_canonicalized_resource(s: &mut String, account: &str, url: &Url) -> Az // "/{account_name}{path}\n" s.push('/'); s.push_str(account); - // Path is percent-decoded by Url, but we use it as-is (leading slash included). + // Append the URL path exactly as-is (per spec). s.push_str(url.path()); // Canonicalized query: lowercase names, sort by name, join multi-values by comma, each line "name:value\n" @@ -204,7 +279,11 @@ fn append_canonicalized_resource(s: &mut String, account: &str, url: &Url) -> Az let mut qp_map: BTreeMap> = BTreeMap::new(); for (name, value) in url.query_pairs() { let key_l = name.to_ascii_lowercase(); - qp_map.entry(key_l).or_default().push(value.to_string()); + let v = value.to_string(); + if v.is_empty() { + continue; + } + qp_map.entry(key_l).or_default().push(v); } for (k, mut vals) in qp_map { vals.sort(); From d75e21f4918e8626a8f7e36685009580fbdd83bb Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 16 Jan 2026 15:56:56 -0500 Subject: [PATCH 09/13] Bump reqwest to 0.12 and consolidate reqwest versions --- Cargo.lock | 53 +++++++++++--------------------- Cargo.toml | 4 +-- src/sinks/azure_common/config.rs | 6 ++-- 3 files changed, 23 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6820ed5864af3..d4fae6381059d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2818,9 +2818,9 @@ checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" [[package]] name = "cookie_store" -version = "0.21.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eac901828f88a5241ee0600950ab981148a18f2f756900ffba1b125ca6a3ef9" +checksum = "3fc4bff745c9b4c7fb1e97b25d13153da2bc7796260141df62378998d070207f" dependencies = [ "cookie", "document-features", @@ -3281,7 +3281,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.4", "percent-encoding", - "reqwest 0.12.24", + "reqwest 0.12.28", "semver", "serde", "serde_json", @@ -4506,7 +4506,7 @@ dependencies = [ "arc-swap", "futures 0.3.31", "log", - "reqwest 0.12.24", + "reqwest 0.12.28", "serde", "serde_derive", "serde_json", @@ -5343,19 +5343,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes 1.10.1", - "hyper 0.14.32", - "native-tls", - "tokio", - "tokio-native-tls", -] - [[package]] name = "hyper-tls" version = "0.6.0" @@ -5960,7 +5947,7 @@ dependencies = [ "k8s-test-framework", "rand 0.9.2", "regex", - "reqwest 0.11.26", + "reqwest 0.12.28", "serde_json", "tokio", "tracing 0.1.41", @@ -7588,7 +7575,7 @@ dependencies = [ "md-5", "percent-encoding", "quick-xml 0.37.4", - "reqwest 0.12.24", + "reqwest 0.12.28", "serde", "serde_json", "tokio", @@ -9243,12 +9230,10 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.32", "hyper-rustls 0.24.2", - "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -9260,7 +9245,6 @@ dependencies = [ "sync_wrapper 0.1.2", "system-configuration 0.5.1", "tokio", - "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -9273,9 +9257,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.24" +version = "0.12.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" dependencies = [ "base64 0.22.1", "bytes 1.10.1", @@ -9291,7 +9275,7 @@ dependencies = [ "http-body-util", "hyper 1.7.0", "hyper-rustls 0.27.5", - "hyper-tls 0.6.0", + "hyper-tls", "hyper-util", "js-sys", "log", @@ -9313,7 +9297,7 @@ dependencies = [ "tokio-rustls 0.26.2", "tokio-util", "tower 0.5.2", - "tower-http 0.6.6", + "tower-http 0.6.8", "tower-service", "url", "wasm-bindgen", @@ -9332,7 +9316,7 @@ dependencies = [ "anyhow", "async-trait", "http 1.3.1", - "reqwest 0.12.24", + "reqwest 0.12.28", "serde", "thiserror 1.0.68", "tower-service", @@ -9351,7 +9335,7 @@ dependencies = [ "http 1.3.1", "hyper 1.7.0", "parking_lot 0.11.2", - "reqwest 0.12.24", + "reqwest 0.12.28", "reqwest-middleware", "retry-policies", "thiserror 1.0.68", @@ -11788,9 +11772,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.6" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "bitflags 2.9.0", "bytes 1.10.1", @@ -12140,7 +12124,7 @@ dependencies = [ "getrandom 0.3.4", "pin-project", "rand 0.9.2", - "reqwest 0.12.24", + "reqwest 0.12.28", "serde", "serde_json", "time", @@ -12440,7 +12424,7 @@ dependencies = [ "owo-colors", "paste", "regex", - "reqwest 0.11.26", + "reqwest 0.12.28", "semver", "serde", "serde_json", @@ -12593,8 +12577,7 @@ dependencies = [ "rdkafka", "redis", "regex", - "reqwest 0.11.26", - "reqwest 0.12.24", + "reqwest 0.12.28", "rmp-serde", "rmpv", "roaring", @@ -13143,7 +13126,7 @@ dependencies = [ "quoted_printable", "rand 0.8.5", "regex", - "reqwest 0.12.24", + "reqwest 0.12.28", "reqwest-middleware", "reqwest-retry", "roxmltree", diff --git a/Cargo.toml b/Cargo.toml index 0b0e5ae29ff99..5590d6ae2313c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,7 +180,7 @@ prost-types = { version = "0.12", default-features = false } rand = { version = "0.9.2", default-features = false, features = ["small_rng", "thread_rng"] } rand_distr = { version = "0.5.1", default-features = false } regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] } -reqwest = { version = "0.11.26", features = ["json"] } +reqwest = { version = "0.12", features = ["json"] } rust_decimal = { version = "1.37.0", default-features = false, features = ["std"] } semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] } serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] } @@ -209,7 +209,7 @@ serial_test = { version = "3.2" } [dependencies] cfg-if.workspace = true -reqwest_0_12_24 = { package = "reqwest", version = "0.12.24", features = ["json"] } +reqwest.workspace = true clap.workspace = true indoc.workspace = true paste.workspace = true diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 8792e7b2d51ce..5fcd5a520ee8e 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -163,11 +163,11 @@ pub fn build_client( } } - // Force Azure SDK to use reqwest_0_12_24 transport to avoid affecting global reqwest + // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12. options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( - reqwest_0_12_24::ClientBuilder::new() + reqwest::ClientBuilder::new() .build() - .map_err(|e| format!("Failed to build reqwest_0_12_24 client: {e}"))?, + .map_err(|e| format!("Failed to build reqwest client: {e}"))?, ))); let client = BlobContainerClient::from_url(url, None, Some(options)).map_err(|e| format!("{e}"))?; From 89f3e9f6b7c611d4287f00c7b1ca49dc7829ad24 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Mon, 19 Jan 2026 21:05:15 -0600 Subject: [PATCH 10/13] remove azure_identity crate, use defaults for other azure --- Cargo.lock | 23 +++++------------------ Cargo.toml | 6 ++---- 2 files changed, 7 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4fae6381059d..6a0d4636bf0ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1727,23 +1727,6 @@ dependencies = [ "tracing 0.1.41", ] -[[package]] -name = "azure_identity" -version = "0.30.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f07bb0ee212021e75c3645e82d078e436b4b4184bde1295e9e81fcbcef923af" -dependencies = [ - "async-lock 3.4.0", - "async-trait", - "azure_core", - "futures 0.3.31", - "pin-project", - "serde", - "time", - "tracing 0.1.41", - "url", -] - [[package]] name = "azure_storage_blob" version = "0.7.0" @@ -11776,13 +11759,18 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ + "async-compression", "bitflags 2.9.0", "bytes 1.10.1", + "futures-core", "futures-util", "http 1.3.1", "http-body 1.0.0", + "http-body-util", "iri-string", "pin-project-lite", + "tokio", + "tokio-util", "tower 0.5.2", "tower-layer", "tower-service", @@ -12475,7 +12463,6 @@ dependencies = [ "aws-types", "axum 0.6.20", "azure_core", - "azure_identity", "azure_storage_blob", "base64 0.22.1", "bloomy", diff --git a/Cargo.toml b/Cargo.toml index 5590d6ae2313c..7a296222c1736 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -288,12 +288,10 @@ aws-smithy-runtime-api = { version = "1.7.3", default-features = false, optional aws-smithy-types = { version = "1.2.11", default-features = false, features = ["rt-tokio"], optional = true } # Azure -azure_core = { version = "0.30", default-features = false, features = ["reqwest", "hmac_openssl"], optional = true } -azure_identity = { version = "0.30", default-features = false, optional = true } +azure_core = { version = "0.30", features = ["reqwest", "hmac_openssl"], optional = true } # Azure Storage -azure_storage_blob = { version = "0.7", default-features = false, optional = true } - +azure_storage_blob = { version = "0.7", optional = true } # OpenDAL opendal = { version = "0.54", default-features = false, features = ["services-webhdfs"], optional = true } From 28d9027e1eec8733926fa1d71755988fb1060986 Mon Sep 17 00:00:00 2001 From: Thomas Date: Wed, 21 Jan 2026 14:02:05 -0500 Subject: [PATCH 11/13] Separate reqwest v0.11 and v0.12 again --- Cargo.lock | 27 ++++++++++++++++++++++----- Cargo.toml | 3 ++- src/sinks/azure_common/config.rs | 2 +- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e2f7be1b3cca..5ee83ae078c4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5304,6 +5304,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.10.1", + "hyper 0.14.32", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -5908,7 +5921,7 @@ dependencies = [ "k8s-test-framework", "rand 0.9.2", "regex", - "reqwest 0.12.28", + "reqwest 0.11.26", "serde_json", "tokio", "tracing 0.1.41", @@ -9256,10 +9269,12 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.32", "hyper-rustls 0.24.2", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -9271,6 +9286,7 @@ dependencies = [ "sync_wrapper 0.1.2", "system-configuration 0.5.1", "tokio", + "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -9301,7 +9317,7 @@ dependencies = [ "http-body-util", "hyper 1.7.0", "hyper-rustls 0.27.5", - "hyper-tls", + "hyper-tls 0.6.0", "hyper-util", "js-sys", "log", @@ -11039,7 +11055,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "core-foundation 0.9.3", "system-configuration-sys 0.6.0", ] @@ -11811,7 +11827,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "async-compression", - "bitflags 2.9.0", + "bitflags 2.10.0", "bytes 1.10.1", "futures-core", "futures-util", @@ -12464,7 +12480,7 @@ dependencies = [ "owo-colors", "paste", "regex", - "reqwest 0.12.28", + "reqwest 0.11.26", "semver", "serde", "serde_json", @@ -12618,6 +12634,7 @@ dependencies = [ "rdkafka", "redis", "regex", + "reqwest 0.11.26", "reqwest 0.12.28", "rmp-serde", "rmpv", diff --git a/Cargo.toml b/Cargo.toml index b967c52c790ed..1e2a51d35a825 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,7 +182,7 @@ prost-types = { version = "0.12", default-features = false } rand = { version = "0.9.2", default-features = false, features = ["small_rng", "thread_rng"] } rand_distr = { version = "0.5.1", default-features = false } regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] } -reqwest = { version = "0.12", features = ["json"] } +reqwest = { version = "0.11", features = ["json"] } rust_decimal = { version = "1.37.0", default-features = false, features = ["std"] } semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] } serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] } @@ -212,6 +212,7 @@ serial_test = { version = "3.2" } [dependencies] cfg-if.workspace = true reqwest.workspace = true +reqwest_12 = { package = "reqwest", version = "0.12", features = ["json"] } clap.workspace = true clap_complete.workspace = true dashmap.workspace = true diff --git a/src/sinks/azure_common/config.rs b/src/sinks/azure_common/config.rs index 5fcd5a520ee8e..fd8dbd0741e13 100644 --- a/src/sinks/azure_common/config.rs +++ b/src/sinks/azure_common/config.rs @@ -165,7 +165,7 @@ pub fn build_client( // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12. options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new( - reqwest::ClientBuilder::new() + reqwest_12::ClientBuilder::new() .build() .map_err(|e| format!("Failed to build reqwest client: {e}"))?, ))); From cbfbf4ef2a8a57bcccdb36a0f4edadaf6409a697 Mon Sep 17 00:00:00 2001 From: Thomas Date: Wed, 21 Jan 2026 14:18:07 -0500 Subject: [PATCH 12/13] Allow CDLA-Permissive-2.0 --- deny.toml | 1 + licenses/CDLA-Permissive-2.0 | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 licenses/CDLA-Permissive-2.0 diff --git a/deny.toml b/deny.toml index 1ab8b4a9c4f76..32a8862fd30ec 100644 --- a/deny.toml +++ b/deny.toml @@ -6,6 +6,7 @@ allow = [ "BSD-3-Clause", "BSL-1.0", "CC0-1.0", + "CDLA-Permissive-2.0", "ISC", "MIT", "MIT-0", diff --git a/licenses/CDLA-Permissive-2.0 b/licenses/CDLA-Permissive-2.0 new file mode 100644 index 0000000000000..cc0f954b59ba2 --- /dev/null +++ b/licenses/CDLA-Permissive-2.0 @@ -0,0 +1,35 @@ +Community Data License Agreement - Permissive - Version 2.0 + +This is the Community Data License Agreement - Permissive, Version 2.0 (the "agreement"). Data Provider(s) and Data Recipient(s) agree as follows: + +1. Provision of the Data + +1.1. A Data Recipient may use, modify, and share the Data made available by Data Provider(s) under this agreement if that Data Recipient follows the terms of this agreement. + +1.2. This agreement does not impose any restriction on a Data Recipient's use, modification, or sharing of any portions of the Data that are in the public domain or that may be used, modified, or shared under any other legal exception or limitation. + +2. Conditions for Sharing Data + +2.1. A Data Recipient may share Data, with or without modifications, so long as the Data Recipient makes available the text of this agreement with the shared Data. + +3. No Restrictions on Results + +3.1. This agreement does not impose any restriction or obligations with respect to the use, modification, or sharing of Results. + +4. No Warranty; Limitation of Liability + +4.1. All Data Recipients receive the Data subject to the following terms: + +THE DATA IS PROVIDED ON AN "AS IS" BASIS, WITHOUT REPRESENTATIONS, WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. + +NO DATA PROVIDER SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE DATA OR RESULTS, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. + +5. Definitions + +5.1. "Data" means the material received by a Data Recipient under this agreement. + +5.2. "Data Provider" means any person who is the source of Data provided under this agreement and in reliance on a Data Recipient's agreement to its terms. + +5.3. "Data Recipient" means any person who receives Data directly or indirectly from a Data Provider and agrees to the terms of this agreement. + +5.4. "Results" means any outcome obtained by computational analysis of Data, including for example machine learning models and models' insights. From 2808b226695e195cc22483261583101d96c0b5e5 Mon Sep 17 00:00:00 2001 From: Joshua Coughlan Date: Thu, 29 Jan 2026 16:27:56 -0600 Subject: [PATCH 13/13] update dates in tests to 2026 so they pass --- website/cue/reference/remap/functions/parse_klog.cue | 2 +- .../cue/reference/remap/functions/parse_linux_authorization.cue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/website/cue/reference/remap/functions/parse_klog.cue b/website/cue/reference/remap/functions/parse_klog.cue index 921b2bbda567e..7d6cfb0464e5a 100644 --- a/website/cue/reference/remap/functions/parse_klog.cue +++ b/website/cue/reference/remap/functions/parse_klog.cue @@ -34,7 +34,7 @@ remap: functions: parse_klog: { level: "info" line: 70 message: "hello from klog" - timestamp: "2025-05-05T17:59:40.692994Z" + timestamp: "2026-05-05T17:59:40.692994Z" } }, ] diff --git a/website/cue/reference/remap/functions/parse_linux_authorization.cue b/website/cue/reference/remap/functions/parse_linux_authorization.cue index 1d1230e88d629..216898569da0d 100644 --- a/website/cue/reference/remap/functions/parse_linux_authorization.cue +++ b/website/cue/reference/remap/functions/parse_linux_authorization.cue @@ -39,7 +39,7 @@ remap: functions: parse_linux_authorization: { hostname: "localhost" message: "Accepted publickey for eng from 10.1.1.1 port 8888 ssh2: RSA SHA256:foobar" procid: 1111 - timestamp: "2025-03-23T01:49:58Z" + timestamp: "2026-03-23T01:49:58Z" } }, ]