Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 39 additions & 37 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,24 @@ aws-smithy-http-tower = { version = "0.49.0", default-features = false, optional
aws-smithy-types = { version = "0.49.0", default-features = false, optional = true }

# Azure
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest"], optional = true }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest"], optional = true }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, optional = true }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, optional = true }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"], optional = true }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"], optional = true }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, optional = true }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, optional = true }

# fix for:
#error: failed to select a version for `time`.
# ... required by package `azure_core v0.5.0 (https://github.com/Azure/azure-sdk-for-rust.git?rev=b4544d4920fa3064eb921340054cd9cc130b7664#b4544d49)`
# ... which satisfies git dependency `azure_core` of package `azure_identity v0.6.0 (https://github.com/Azure/azure-sdk-for-rust.git?rev=b4544d4920fa3064eb921340054cd9cc130b7664#b4544d49)`
# ... which satisfies git dependency `azure_identity` of package `vector v0.25.0 (/home/u112324/git/vector)`
#versions that meet the requirements `^0.3.10` are: 0.3.14, 0.3.13, 0.3.12, 0.3.11, 0.3.10
#
#all possible versions conflict with previously selected packages.
#
# previously selected package `time v0.3.9`
# ... which satisfies dependency `time = "^0.3.4"` (locked to 0.3.9) of package `aws-config v0.49.0`
# ... which satisfies dependency `aws-config = "^0.49.0"` (locked to 0.49.0) of package `vector v0.25.0 (/home/u112324/git/vector)`
time = { version = "0.3.10" }
Comment thread
yvespp marked this conversation as resolved.
Outdated

# Tower
tower = { version = "0.4.13", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
Expand Down Expand Up @@ -332,10 +346,10 @@ tonic-build = { version = "0.8", default-features = false, features = ["transpor
[dev-dependencies]
approx = "0.5.1"
assert_cmd = { version = "2.0.4", default-features = false }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest", "azurite_workaround"] }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["enable_reqwest"] }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["azurite_workaround"] }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b7171eb40909f7f2805f4622e076f8a6dbbe2d98", default-features = false, features = ["azurite_workaround"] }
azure_core = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest", "azurite_workaround"] }
azure_identity = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["enable_reqwest"] }
azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["azurite_workaround"] }
azure_storage_blobs = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev = "b4544d4920fa3064eb921340054cd9cc130b7664", default-features = false, features = ["azurite_workaround"] }
base64 = "0.13.0"
criterion = { version = "0.4.0", features = ["html_reports", "async_tokio"] }
libc = "0.2.133"
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/azure_blob/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct AzureBlobSinkConfig {
/// [env_cred_docs]: https://docs.rs/azure_identity/latest/azure_identity/struct.EnvironmentCredential.html
/// [managed_ident_docs]: https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview
/// [az_cli_docs]: https://docs.microsoft.com/en-us/cli/azure/account?view=azure-cli-latest#az-account-get-access-token
pub storage_account: Option<SensitiveString>,
pub storage_account: Option<String>,

/// The Azure Blob Storage Account container name.
pub(super) container_name: String,
Expand Down Expand Up @@ -117,7 +117,7 @@ impl GenerateConfig for AzureBlobSinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
storage_account: Some(String::from("some-account-name").into()),
storage_account: Some(String::from("some-account-name")),
container_name: String::from("logs"),
blob_prefix: Some(String::from("blob")),
blob_time_format: Some(String::from("%s")),
Expand Down
47 changes: 28 additions & 19 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
num::NonZeroU32,
};

use azure_core::{prelude::Range, HttpError};
use azure_core::{error::HttpError, prelude::Range};
use azure_storage_blobs::prelude::*;
use bytes::{Buf, BytesMut};
use codecs::{
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn azure_blob_insert_lines_into_blob() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand All @@ -105,7 +105,7 @@ async fn azure_blob_insert_json_into_blob() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand All @@ -132,7 +132,7 @@ async fn azure_blob_insert_lines_into_blob_gzip() {

config.run_assert(events).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log.gz"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand Down Expand Up @@ -164,7 +164,7 @@ async fn azure_blob_insert_json_into_blob_gzip() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 1);
assert!(blobs[0].clone().ends_with(".log.gz"));
let (blob, blob_lines) = config.get_blob(blobs[0].clone()).await;
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn azure_blob_rotate_files_after_the_buffer_size_is_reached() {

config.run_assert(input).await;

let blobs = config.list_blobs(blob_prefix.as_str()).await;
let blobs = config.list_blobs(blob_prefix).await;
assert_eq!(blobs.len(), 3);
let response = stream::iter(blobs)
.fold(Vec::new(), |mut acc, blob| async {
Expand Down Expand Up @@ -253,7 +253,7 @@ impl AzureBlobSinkConfig {
.expect("Running sink failed");
}

pub async fn list_blobs(&self, prefix: &str) -> Vec<String> {
pub async fn list_blobs(&self, prefix: String) -> Vec<String> {
let client = azure_common::config::build_client(
self.connection_string.clone().map(Into::into),
self.storage_account.clone().map(Into::into),
Expand All @@ -266,14 +266,15 @@ impl AzureBlobSinkConfig {
.max_results(NonZeroU32::new(1000).unwrap())
.delimiter("/")
.include_metadata(true)
.execute()
.into_stream()
.next()
.await
.expect("Failed to fetch blobs");
.expect("Failed to fetch blobs")
.unwrap();

let blobs = response
.blobs
.blobs
.iter()
.blobs()
.map(|blob| blob.name.clone())
.collect::<Vec<_>>();

Expand All @@ -288,14 +289,19 @@ impl AzureBlobSinkConfig {
)
.unwrap();
let response = client
.as_blob_client(blob.as_str())
.blob_client(blob)
.get()
.range(Range::new(0, 1024 * 1024))
.execute()
.into_stream()
.next()
.await
.expect("Failed to get blob");
.expect("Failed to get blob")
.unwrap();

(response.blob, self.get_blob_content(response.data.to_vec()))
(
response.blob,
self.get_blob_content(response.data.collect().await.unwrap().to_vec()),
)
}

fn get_blob_content(&self, data: Vec<u8>) -> Vec<String> {
Expand All @@ -318,14 +324,17 @@ impl AzureBlobSinkConfig {
self.container_name.clone(),
)
.unwrap();
let request = client.create().public_access(PublicAccess::None).execute();
let request = client
.create()
.public_access(PublicAccess::None)
.into_future();

let response = match request.await {
Ok(_) => Ok(()),
Err(reason) => match reason.downcast_ref::<HttpError>() {
Some(HttpError::StatusCode { status, .. }) => match *status {
StatusCode::CONFLICT => Ok(()),
status => Err(format!("Unexpected status code {}", status)),
Some(err) => match StatusCode::from_u16(err.status().into()) {
Ok(StatusCode::CONFLICT) => Ok(()),
_ => Err(format!("Unexpected status code {}", err.status())),
},
_ => Err(format!("Unexpected error {}", reason)),
},
Expand Down
Loading