Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ emoji
emqx
enableable
Enot
Entra
envsubst
EPC
esbuild
Expand Down Expand Up @@ -293,6 +294,7 @@ kubeval
Kurio
kustomization
kustomize
Kusto
kyocera
Kyros
Lenco
Expand Down
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ aws-smithy-types = { version = "1.2.11", default-features = false, features = ["

# Azure
azure_core = { version = "0.30", features = ["reqwest", "hmac_openssl"], optional = true }
azure_identity = { version = "0.30", optional = true }

# Azure Storage
azure_storage_blob = { version = "0.7", optional = true }
Expand Down Expand Up @@ -785,6 +786,7 @@ sinks-logs = [
"sinks-aws_sqs",
"sinks-axiom",
"sinks-azure_blob",
"sinks-azure_data_explorer",
"sinks-azure_monitor_logs",
"sinks-blackhole",
"sinks-chronicle",
Expand Down Expand Up @@ -852,6 +854,7 @@ sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
sinks-axiom = ["sinks-http"]
sinks-azure_blob = ["dep:azure_core", "dep:azure_storage_blob"]
sinks-azure_data_explorer = ["dep:azure_core", "dep:azure_identity", "dep:base64"]
sinks-azure_monitor_logs = []
sinks-blackhole = []
sinks-chronicle = []
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/azure_data_explorer_sink.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
A new `azure_data_explorer` sink has been added to deliver log events to Azure Data Explorer (Kusto) via queued ingestion. The sink supports Azure Entra ID (Azure AD) service principal authentication, configurable ingestion mapping references, gzip compression, and flexible batching options. Data is uploaded as JSONL to Azure Blob Storage and then enqueued for ingestion.

authors: benmali
107 changes: 107 additions & 0 deletions src/sinks/azure_data_explorer/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//! Azure Entra ID authentication for Azure Data Explorer.
//!
//! Uses [`azure_identity::ClientSecretCredential`] (from the official Azure SDK
//! for Rust) for service-principal client-credentials authentication, rather
//! than a hand-rolled OAuth2 flow.

use std::sync::Arc;

use azure_core::credentials::{Secret, TokenCredential};
use azure_identity::ClientSecretCredential;
use vector_lib::sensitive_string::SensitiveString;

/// Scope for Azure Data Explorer / Kusto API access.
const KUSTO_SCOPE: &str = "https://kusto.kusto.windows.net/.default";

// ---------------------------------------------------------------------------
// Internal trait: allows swapping in a mock for tests without needing to
// construct `azure_core::credentials::AccessToken` (which requires the `time`
// crate's `OffsetDateTime`).
// ---------------------------------------------------------------------------

#[async_trait::async_trait]
trait TokenProvider: Send + Sync {
async fn get_bearer_token(&self) -> crate::Result<String>;
}

/// Production token provider backed by [`ClientSecretCredential`].
struct EntraTokenProvider {
credential: Arc<ClientSecretCredential>,
}

#[async_trait::async_trait]
impl TokenProvider for EntraTokenProvider {
async fn get_bearer_token(&self) -> crate::Result<String> {
let access_token = self
.credential
.get_token(&[KUSTO_SCOPE], None)
.await
.map_err(|e| format!("Failed to acquire Azure Entra token: {e}"))?;

Ok(access_token.token.secret().to_string())
}
}

// ---------------------------------------------------------------------------
// Public auth wrapper
// ---------------------------------------------------------------------------

/// Azure Entra ID token provider for Azure Data Explorer.
///
/// Wraps [`azure_identity::ClientSecretCredential`] to acquire Bearer tokens
/// via the OAuth2 client-credentials flow. Token caching and refresh are
/// handled internally by the Azure SDK.
#[derive(Clone)]
pub(super) struct AzureDataExplorerAuth {
provider: Arc<dyn TokenProvider>,
}

impl AzureDataExplorerAuth {
/// Creates a new auth provider backed by [`ClientSecretCredential`].
pub(super) fn new(
tenant_id: &str,
client_id: String,
client_secret: SensitiveString,
) -> crate::Result<Self> {
let secret = Secret::from(client_secret.inner().to_string());
let credential = ClientSecretCredential::new(tenant_id, client_id, secret, None)
.map_err(|e| format!("Failed to create Azure credential: {e}"))?;

Ok(Self {
provider: Arc::new(EntraTokenProvider { credential }),
})
}

/// Creates a mock auth provider that always returns the given token.
/// For use in tests only.
#[cfg(test)]
pub(super) fn mock(token: impl Into<String>) -> Self {
Self {
provider: Arc::new(MockTokenProvider {
token: token.into(),
}),
}
}

/// Returns a valid Bearer access token string.
pub(super) async fn get_token(&self) -> crate::Result<String> {
self.provider.get_bearer_token().await
}
}

// ---------------------------------------------------------------------------
// Test-only mock
// ---------------------------------------------------------------------------

#[cfg(test)]
struct MockTokenProvider {
token: String,
}

#[cfg(test)]
#[async_trait::async_trait]
impl TokenProvider for MockTokenProvider {
async fn get_bearer_token(&self) -> crate::Result<String> {
Ok(self.token.clone())
}
}
Loading
Loading