Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 38 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ aws-smithy-http-tower = { version = "0.49.0", default-features = false, optional
aws-smithy-types = { version = "0.49.0", default-features = false, optional = true }

# Azure
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest"], optional = true }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest"], optional = true }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, optional = true }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, optional = true }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"], optional = true }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"], optional = true }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, optional = true }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, optional = true }

# Tower
tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
Expand Down Expand Up @@ -332,10 +332,10 @@ tonic-build = { version = "0.8", default-features = false, features = ["transpor
[dev-dependencies]
approx = "0.5.1"
assert_cmd = { version = "2.0.4", default-features = false }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest", "azurite_workaround"] }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest"] }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["azurite_workaround"] }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["azurite_workaround"] }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest", "azurite_workaround"] }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"] }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["azurite_workaround"] }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["azurite_workaround"] }
base64 = "0.13.0"
criterion = { version = "0.4.0", features = ["html_reports", "async_tokio"] }
libc = "0.2.134"
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct AzureBlobSinkConfig {
/// [env_cred_docs]: https://docs.rs/azure_identity/latest/azure_identity/struct.EnvironmentCredential.html
/// [managed_ident_docs]: https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview
/// [az_cli_docs]: https://docs.microsoft.com/en-us/cli/azure/account?view=azure-cli-latest#az-account-get-access-token
pub storage_account: Option<SensitiveString>,
pub storage_account: Option<String>,

/// The Azure Blob Storage Account container name.
pub(super) container_name: String,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl GenerateConfig for AzureBlobSinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
storage_account: Some(String::from("some-account-name").into()),
storage_account: Some(String::from("some-account-name")),
container_name: String::from("logs"),
blob_prefix: Some(String::from("blob")),
blob_time_format: Some(String::from("%s")),
Expand Down
47 changes: 28 additions & 19 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
num::NonZeroU32,
};

use azure_core::{prelude::Range, HttpError};
use azure_core::{error::HttpError, prelude::Range};
use azure_storage_blobs::prelude::*;
use bytes::{Buf, BytesMut};
use codecs::{
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn azure_blob_insert_lines_into_blob() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand All @@ -105,7 +105,7 @@ async fn azure_blob_insert_json_into_blob() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand All @@ -132,7 +132,7 @@ async fn azure_blob_insert_lines_into_blob_gzip() {

config.run_assert(events).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log.gz"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand Down Expand Up @@ -164,7 +164,7 @@ async fn azure_blob_insert_json_into_blob_gzip() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log.gz"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 3);
let response = stream::iter(blobs)
.fold(Vec::new(), |mut acc, blob| async {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl AzureBlobSinkConfig {
.expect("Running sink failed");
}

pub async fn list_blobs(&self, prefix: &str) -> Vec<String> {
pub async fn list_blobs(&self, prefix: String) -> Vec<String> {
let client = azure_common::config::build_client(
self.connection_string.clone().map(Into::into),
self.storage_account.clone().map(Into::into),
Expand All @@ -266,14 +266,15 @@ impl AzureBlobSinkConfig {
.max_results(NonZeroU32::new(1000).unwrap())
.delimiter("/")
.include_metadata(true)
.execute()
.into_stream()
.next()
.await
.expect("Failed to fetch blobs");
.expect("Failed to fetch blobs")
.unwrap();

let blobs = response
.blobs
.blobs
.iter()
.blobs()
.map(|blob| blob.name.clone())
.collect::<Vec<_>>();

Expand All @@ -288,14 +289,19 @@ impl AzureBlobSinkConfig {
)
.unwrap();
let response = client
.as_blob_client(blob.as_str())
.blob_client(blob)
.get()
.range(Range::new(0, 1024 * 1024))
.execute()
.into_stream()
.next()
.await
.expect("Failed to get blob");
.expect("Failed to get blob")
.unwrap();

(response.blob, self.get_blob_content(response.data.to_vec()))
(
response.blob,
self.get_blob_content(response.data.collect().await.unwrap().to_vec()),
)
}

fn get_blob_content(&self, data: Vec<u8>) -> Vec<String> {
Expand All @@ -318,14 +324,17 @@ impl AzureBlobSinkConfig {
self.container_name.clone(),
)
.unwrap();
let request = client.create().public_access(PublicAccess::None).execute();
let request = client
.create()
.public_access(PublicAccess::None)
.into_future();

let response = match request.await {
Ok(_) => Ok(()),
Err(reason) => match reason.downcast_ref::<HttpError>() {
Some(HttpError::StatusCode { status, .. }) => match *status {
StatusCode::CONFLICT => Ok(()),
status => Err(format!("Unexpected status code {}", status)),
Some(err) => match StatusCode::from_u16(err.status().into()) {
Ok(StatusCode::CONFLICT) => Ok(()),
_ => Err(format!("Unexpected status code {}", err.status())),
},
_ => Err(format!("Unexpected error {}", reason)),
},
Expand Down
Loading