diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index 7581a21c6be1a..bd2c52d5bb2d3 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -140,6 +140,7 @@ emoji emqx enableable Enot +Entra envsubst EPC esbuild @@ -293,6 +294,7 @@ kubeval Kurio kustomization kustomize +Kusto kyocera Kyros Lenco diff --git a/Cargo.lock b/Cargo.lock index 5e5a877676da5..160bd6bc7b2a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1718,6 +1718,23 @@ dependencies = [ "tracing 0.1.41", ] +[[package]] +name = "azure_identity" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f07bb0ee212021e75c3645e82d078e436b4b4184bde1295e9e81fcbcef923af" +dependencies = [ + "async-lock 3.4.0", + "async-trait", + "azure_core", + "futures 0.3.31", + "pin-project", + "serde", + "time", + "tracing 0.1.41", + "url", +] + [[package]] name = "azure_storage_blob" version = "0.7.0" @@ -12517,6 +12534,7 @@ dependencies = [ "aws-types", "axum 0.6.20", "azure_core", + "azure_identity", "azure_storage_blob", "base64 0.22.1", "bloomy", diff --git a/Cargo.toml b/Cargo.toml index afbc2e3a0f90e..1d81c9d340b45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -296,6 +296,7 @@ aws-smithy-types = { version = "1.2.11", default-features = false, features = [" # Azure azure_core = { version = "0.30", features = ["reqwest", "hmac_openssl"], optional = true } +azure_identity = { version = "0.30", optional = true } # Azure Storage azure_storage_blob = { version = "0.7", optional = true } @@ -785,6 +786,7 @@ sinks-logs = [ "sinks-aws_sqs", "sinks-axiom", "sinks-azure_blob", + "sinks-azure_data_explorer", "sinks-azure_monitor_logs", "sinks-blackhole", "sinks-chronicle", @@ -852,6 +854,7 @@ sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-http"] sinks-azure_blob = ["dep:azure_core", "dep:azure_storage_blob"] +sinks-azure_data_explorer = ["dep:azure_core", "dep:azure_identity", "dep:base64"] sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] diff --git a/changelog.d/azure_data_explorer_sink.feature.md b/changelog.d/azure_data_explorer_sink.feature.md new file mode 100644 index 0000000000000..75c3dcf7ab0fb --- /dev/null +++ b/changelog.d/azure_data_explorer_sink.feature.md @@ -0,0 +1,3 @@ +A new `azure_data_explorer` sink has been added to deliver log events to Azure Data Explorer (Kusto) via queued ingestion. The sink supports Azure Entra ID (Azure AD) service principal authentication, configurable ingestion mapping references, gzip compression, and flexible batching options. Data is uploaded as JSONL to Azure Blob Storage and then enqueued for ingestion. + +authors: benmali diff --git a/src/sinks/azure_data_explorer/auth.rs b/src/sinks/azure_data_explorer/auth.rs new file mode 100644 index 0000000000000..618ca4ae0be5c --- /dev/null +++ b/src/sinks/azure_data_explorer/auth.rs @@ -0,0 +1,107 @@ +//! Azure Entra ID authentication for Azure Data Explorer. +//! +//! Uses [`azure_identity::ClientSecretCredential`] (from the official Azure SDK +//! for Rust) for service-principal client-credentials authentication, rather +//! than a hand-rolled OAuth2 flow. + +use std::sync::Arc; + +use azure_core::credentials::{Secret, TokenCredential}; +use azure_identity::ClientSecretCredential; +use vector_lib::sensitive_string::SensitiveString; + +/// Scope for Azure Data Explorer / Kusto API access. +const KUSTO_SCOPE: &str = "https://kusto.kusto.windows.net/.default"; + +// --------------------------------------------------------------------------- +// Internal trait: allows swapping in a mock for tests without needing to +// construct `azure_core::credentials::AccessToken` (which requires the `time` +// crate's `OffsetDateTime`). +// --------------------------------------------------------------------------- + +#[async_trait::async_trait] +trait TokenProvider: Send + Sync { + async fn get_bearer_token(&self) -> crate::Result; +} + +/// Production token provider backed by [`ClientSecretCredential`]. +struct EntraTokenProvider { + credential: Arc, +} + +#[async_trait::async_trait] +impl TokenProvider for EntraTokenProvider { + async fn get_bearer_token(&self) -> crate::Result { + let access_token = self + .credential + .get_token(&[KUSTO_SCOPE], None) + .await + .map_err(|e| format!("Failed to acquire Azure Entra token: {e}"))?; + + Ok(access_token.token.secret().to_string()) + } +} + +// --------------------------------------------------------------------------- +// Public auth wrapper +// --------------------------------------------------------------------------- + +/// Azure Entra ID token provider for Azure Data Explorer. +/// +/// Wraps [`azure_identity::ClientSecretCredential`] to acquire Bearer tokens +/// via the OAuth2 client-credentials flow. Token caching and refresh are +/// handled internally by the Azure SDK. +#[derive(Clone)] +pub(super) struct AzureDataExplorerAuth { + provider: Arc, +} + +impl AzureDataExplorerAuth { + /// Creates a new auth provider backed by [`ClientSecretCredential`]. + pub(super) fn new( + tenant_id: &str, + client_id: String, + client_secret: SensitiveString, + ) -> crate::Result { + let secret = Secret::from(client_secret.inner().to_string()); + let credential = ClientSecretCredential::new(tenant_id, client_id, secret, None) + .map_err(|e| format!("Failed to create Azure credential: {e}"))?; + + Ok(Self { + provider: Arc::new(EntraTokenProvider { credential }), + }) + } + + /// Creates a mock auth provider that always returns the given token. + /// For use in tests only. + #[cfg(test)] + pub(super) fn mock(token: impl Into) -> Self { + Self { + provider: Arc::new(MockTokenProvider { + token: token.into(), + }), + } + } + + /// Returns a valid Bearer access token string. + pub(super) async fn get_token(&self) -> crate::Result { + self.provider.get_bearer_token().await + } +} + +// --------------------------------------------------------------------------- +// Test-only mock +// --------------------------------------------------------------------------- + +#[cfg(test)] +struct MockTokenProvider { + token: String, +} + +#[cfg(test)] +#[async_trait::async_trait] +impl TokenProvider for MockTokenProvider { + async fn get_bearer_token(&self) -> crate::Result { + Ok(self.token.clone()) + } +} diff --git a/src/sinks/azure_data_explorer/config.rs b/src/sinks/azure_data_explorer/config.rs new file mode 100644 index 0000000000000..ce0b33fb60d2a --- /dev/null +++ b/src/sinks/azure_data_explorer/config.rs @@ -0,0 +1,233 @@ +//! Configuration for the `azure_data_explorer` sink. +//! +//! Uses **queued ingestion** (blob upload + queue notification), matching the +//! Fluent Bit `out_azure_kusto` plugin. + +use futures::FutureExt; +use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString}; +use vrl::value::Kind; + +use super::{ + auth::AzureDataExplorerAuth, + encoder::AzureDataExplorerEncoder, + request_builder::AzureDataExplorerRequestBuilder, + resources::ResourceManager, + service::{AzureDataExplorerService, QueuedIngestConfig}, + sink::AzureDataExplorerSink, +}; +use crate::{ + http::HttpClient, + sinks::{ + prelude::*, + util::{BatchConfig, http::http_response_retry_logic}, + }, +}; + +/// Configuration for the `azure_data_explorer` sink. +#[configurable_component(sink( + "azure_data_explorer", + "Deliver log events to Azure Data Explorer via queued ingestion." +))] +#[derive(Clone, Debug)] +pub struct AzureDataExplorerConfig { + /// The Kusto cluster's **ingestion** endpoint URL. + /// + /// This is the `ingest-` prefixed URL, e.g. + /// `https://ingest-mycluster.eastus.kusto.windows.net`. + #[configurable(metadata( + docs::examples = "https://ingest-mycluster.eastus.kusto.windows.net", + ))] + #[configurable(validation(format = "uri"))] + pub(super) ingestion_endpoint: String, + + /// The name of the target database. + #[configurable(metadata(docs::examples = "my_database"))] + pub(super) database: String, + + /// The name of the target table inside the database. + #[configurable(metadata(docs::examples = "my_table"))] + pub(super) table: String, + + /// Azure Entra ID (Azure AD) tenant ID for service-principal authentication. + #[configurable(metadata(docs::examples = "${AZURE_TENANT_ID}"))] + pub(super) tenant_id: String, + + /// Azure Entra ID application (client) ID. + #[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID}"))] + pub(super) client_id: String, + + /// Azure Entra ID application client secret. + #[configurable(metadata(docs::examples = "${AZURE_CLIENT_SECRET}"))] + pub(super) client_secret: SensitiveString, + + /// Optional ingestion mapping reference name. + /// + /// When set, the value is passed in the ingestion message's + /// `jsonMappingReference` property. + #[serde(default)] + #[configurable(metadata(docs::examples = "my_mapping"))] + pub(super) mapping_reference: Option, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + pub(super) encoding: Transformer, + + /// The compression algorithm to use. + /// + /// When enabled, the JSONL payload is gzip-compressed before blob upload + /// and the blob name ends with `.multijson.gz`. + #[configurable(derived)] + #[serde(default = "Compression::gzip_default")] + pub(super) compression: Compression, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub(super) acknowledgements: AcknowledgementsConfig, +} + +#[derive(Clone, Copy, Debug, Default)] +pub(super) struct AzureDataExplorerDefaultBatchSettings; + +impl SinkBatchSettings for AzureDataExplorerDefaultBatchSettings { + const MAX_EVENTS: Option = Some(1_000); + const MAX_BYTES: Option = Some(4_000_000); // 4 MB + const TIMEOUT_SECS: f64 = 30.0; +} + +impl GenerateConfig for AzureDataExplorerConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"ingestion_endpoint = "https://ingest-mycluster.eastus.kusto.windows.net" + database = "my_database" + table = "my_table" + tenant_id = "${AZURE_TENANT_ID}" + client_id = "${AZURE_CLIENT_ID}" + client_secret = "${AZURE_CLIENT_SECRET}""#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "azure_data_explorer")] +impl SinkConfig for AzureDataExplorerConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let request_builder = AzureDataExplorerRequestBuilder { + encoder: AzureDataExplorerEncoder { + transformer: self.encoding.clone(), + }, + compression: self.compression, + }; + + let client = HttpClient::new(None, cx.proxy())?; + + let auth = AzureDataExplorerAuth::new( + &self.tenant_id, + self.client_id.clone(), + self.client_secret.clone(), + )?; + + // Resource manager handles .get ingestion resources + identity token caching + let resource_manager = ResourceManager::new( + auth.clone(), + client.clone(), + self.ingestion_endpoint.clone(), + ); + + let queued_config = QueuedIngestConfig { + database: self.database.clone(), + table: self.table.clone(), + mapping_reference: self.mapping_reference.clone(), + compression: self.compression, + }; + + let service = + AzureDataExplorerService::new(client.clone(), resource_manager.clone(), queued_config); + + let request_limits = self.request.into_settings(); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = AzureDataExplorerSink::new(service, batch_settings, request_builder); + + let healthcheck = healthcheck(self.ingestion_endpoint.clone(), auth).boxed(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + Input::log().with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +/// Validates credentials and ingestion endpoint reachability by: +/// 1. Acquiring an Entra token (validates service-principal credentials) +/// 2. Executing a lightweight `.show version` management command +async fn healthcheck( + ingestion_endpoint: String, + auth: AzureDataExplorerAuth, +) -> crate::Result<()> { + let token = auth.get_token().await?; + + let mgmt_uri = format!( + "{}/v1/rest/mgmt", + ingestion_endpoint.trim_end_matches('/') + ); + + let body = serde_json::json!({ + "csl": ".show version", + "db": "NetDefaultDB" + }); + let body_bytes = bytes::Bytes::from(serde_json::to_vec(&body)?); + + let request = http::Request::post(&mgmt_uri) + .header("Authorization", format!("Bearer {}", token)) + .header("Content-Type", "application/json") + .body(hyper::Body::from(body_bytes))?; + + let client = HttpClient::new(None, &Default::default())?; + let response = client.send(request).await?; + let status = response.status(); + + if status.is_success() { + Ok(()) + } else if status == http::StatusCode::UNAUTHORIZED || status == http::StatusCode::FORBIDDEN { + Err(format!( + "Azure Data Explorer authentication failed (HTTP {}). \ + Verify tenant_id, client_id, and client_secret.", + status + ) + .into()) + } else { + let body = http_body::Body::collect(response.into_body()) + .await? + .to_bytes(); + let body_str = String::from_utf8_lossy(&body); + Err(format!( + "Azure Data Explorer healthcheck failed: HTTP {} - {}", + status, body_str + ) + .into()) + } +} diff --git a/src/sinks/azure_data_explorer/encoder.rs b/src/sinks/azure_data_explorer/encoder.rs new file mode 100644 index 0000000000000..acd4087989217 --- /dev/null +++ b/src/sinks/azure_data_explorer/encoder.rs @@ -0,0 +1,60 @@ +//! JSONL encoder for the `azure_data_explorer` sink. +//! +//! Each event is serialized to JSON exactly as it exists at the sink boundary +//! (no wrapping, no added/removed fields). Events are joined with newlines +//! to produce JSON Lines / MultiJSON output. + +use std::io; + +use crate::sinks::{ + prelude::*, + util::encoding::{write_all, Encoder as SinkEncoder}, +}; + +pub(super) struct AzureDataExplorerEncoder { + pub(super) transformer: Transformer, +} + +impl SinkEncoder> for AzureDataExplorerEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let n_events = events.len(); + let mut body = Vec::new(); + + for (i, mut event) in events.into_iter().enumerate() { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + // Convert to LogEvent and serialize the inner value directly to JSON. + // This preserves the original event structure without Vector's internal metadata. + let log = event.into_log(); + + // Serialize to JSON + serde_json::to_writer(&mut body, log.value())?; + + // Newline delimiter between events (MultiJSON / JSONL format). + if i < n_events - 1 { + body.push(b'\n'); + } + } + + // Debug: Log the first 2000 bytes of the payload + if !body.is_empty() { + let preview_len = body.len().min(2000); + let preview = String::from_utf8_lossy(&body[..preview_len]); + debug!( + message = "Encoded payload for ADX", + n_events = n_events, + total_bytes = body.len(), + preview = %preview, + ); + } + + write_all(writer, n_events, &body).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/azure_data_explorer/mod.rs b/src/sinks/azure_data_explorer/mod.rs new file mode 100644 index 0000000000000..ab5f1a2a43f2f --- /dev/null +++ b/src/sinks/azure_data_explorer/mod.rs @@ -0,0 +1,17 @@ +//! The Azure Data Explorer (ADX / Kusto) [`vector_lib::sink::VectorSink`]. +//! +//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_lib::event::Event`]s and forwarding them to Azure Data Explorer +//! via **queued ingestion** (blob upload + queue notification), matching the Fluent Bit +//! `out_azure_kusto` plugin's ingestion flow. + +mod auth; +mod config; +mod encoder; +mod request_builder; +mod resources; +mod service; +mod sink; + +#[cfg(test)] +mod tests; diff --git a/src/sinks/azure_data_explorer/request_builder.rs b/src/sinks/azure_data_explorer/request_builder.rs new file mode 100644 index 0000000000000..b972d3a45b723 --- /dev/null +++ b/src/sinks/azure_data_explorer/request_builder.rs @@ -0,0 +1,48 @@ +//! `RequestBuilder` implementation for the `azure_data_explorer` sink. + +use std::io; + +use bytes::Bytes; + +use super::encoder::AzureDataExplorerEncoder; +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +pub(super) struct AzureDataExplorerRequestBuilder { + pub(super) encoder: AzureDataExplorerEncoder, + pub(super) compression: Compression, +} + +impl RequestBuilder> for AzureDataExplorerRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = AzureDataExplorerEncoder; + type Payload = Bytes; + type Request = HttpRequest<()>; + type Error = io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata, ()) + } +} diff --git a/src/sinks/azure_data_explorer/resources.rs b/src/sinks/azure_data_explorer/resources.rs new file mode 100644 index 0000000000000..66ecfd0ac4cd2 --- /dev/null +++ b/src/sinks/azure_data_explorer/resources.rs @@ -0,0 +1,344 @@ +//! Ingestion resource discovery and caching for Azure Data Explorer queued ingestion. +//! +//! Implements the `.get ingestion resources` and `.get kusto identity token` +//! management commands to discover blob storage and queue endpoints. +//! Matches the Fluent Bit `azure_kusto_conf.c` / `azure_kusto_ingest.c` flow. + +use std::sync::Arc; +use std::time::Instant; + +use bytes::Bytes; +use http::Request; +use serde_json::Value; +use tokio::sync::RwLock; + +use super::auth::AzureDataExplorerAuth; +use crate::http::HttpClient; + +/// Default refresh interval for ingestion resources (1 hour, matching Fluent Bit). +const RESOURCES_REFRESH_INTERVAL_SECS: u64 = 3600; + +/// ADX management endpoint path. +const MGMT_URI_PATH: &str = "/v1/rest/mgmt"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/// A single storage endpoint (blob or queue) with its SAS token. +#[derive(Clone, Debug)] +pub(super) struct StorageEndpoint { + /// Full base URL without query string + /// e.g. `https://host.blob.core.windows.net/container` + pub base_url: String, + /// SAS query string (without leading `?`) + pub sas_token: String, +} + +/// Cached ingestion resources returned by the ADX management endpoint. +#[derive(Clone, Debug)] +pub(super) struct IngestionResources { + pub blob_endpoints: Vec, + pub queue_endpoints: Vec, + pub identity_token: String, + loaded_at: Instant, +} + +impl IngestionResources { + fn is_stale(&self) -> bool { + self.loaded_at.elapsed().as_secs() >= RESOURCES_REFRESH_INTERVAL_SECS + } +} + +// --------------------------------------------------------------------------- +// ResourceManager +// --------------------------------------------------------------------------- + +/// Manages ingestion resource discovery and caching. +/// +/// Periodically refreshes blob/queue SAS URIs and the Kusto identity token +/// by executing management commands against the ADX ingestion endpoint. +#[derive(Clone)] +pub(super) struct ResourceManager { + auth: AzureDataExplorerAuth, + http_client: HttpClient, + ingestion_endpoint: String, + cached: Arc>>, +} + +impl ResourceManager { + pub(super) fn new( + auth: AzureDataExplorerAuth, + http_client: HttpClient, + ingestion_endpoint: String, + ) -> Self { + Self { + auth, + http_client, + ingestion_endpoint, + cached: Arc::new(RwLock::new(None)), + } + } + + /// Returns cached resources, refreshing them if stale or absent. + pub(super) async fn get_resources(&self) -> crate::Result { + // Fast path: check if cached resources are still fresh. + { + let cached = self.cached.read().await; + if let Some(ref resources) = *cached { + if !resources.is_stale() { + return Ok(resources.clone()); + } + } + } + + // Slow path: refresh resources. + let resources = self.load_resources().await?; + { + let mut cached = self.cached.write().await; + *cached = Some(resources.clone()); + } + Ok(resources) + } + + async fn load_resources(&self) -> crate::Result { + // Step 1: Get ingestion resources (blob + queue endpoints) + let resources_response = self + .execute_mgmt_command(".get ingestion resources") + .await?; + let (blob_endpoints, queue_endpoints) = parse_storage_resources(&resources_response)?; + + if blob_endpoints.is_empty() { + return Err("No blob storage endpoints returned by ADX".into()); + } + if queue_endpoints.is_empty() { + return Err("No queue endpoints returned by ADX".into()); + } + + // Step 2: Get identity token + let identity_response = self + .execute_mgmt_command(".get kusto identity token") + .await?; + let identity_token = parse_identity_token(&identity_response)?; + + info!( + message = "Loaded ADX ingestion resources.", + blob_count = blob_endpoints.len(), + queue_count = queue_endpoints.len(), + ); + + Ok(IngestionResources { + blob_endpoints, + queue_endpoints, + identity_token, + loaded_at: Instant::now(), + }) + } + + /// Executes a CSL management command against the ingestion endpoint. + async fn execute_mgmt_command(&self, csl: &str) -> crate::Result { + let token = self.auth.get_token().await?; + let mgmt_uri = format!( + "{}{}", + self.ingestion_endpoint.trim_end_matches('/'), + MGMT_URI_PATH + ); + + let body = serde_json::json!({ + "csl": csl, + "db": "NetDefaultDB" + }); + let body_bytes = Bytes::from(serde_json::to_vec(&body)?); + + let request = Request::post(&mgmt_uri) + .header("Authorization", format!("Bearer {}", token)) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .body(hyper::Body::from(body_bytes))?; + + let response = self.http_client.send(request).await?; + let status = response.status(); + let body = http_body::Body::collect(response.into_body()) + .await? + .to_bytes(); + let body_str = String::from_utf8_lossy(&body).to_string(); + + if status.is_success() { + Ok(body_str) + } else { + Err(format!( + "ADX management command '{}' failed: HTTP {} - {}", + csl, + status, + &body_str[..body_str.len().min(500)] + ) + .into()) + } + } +} + +// --------------------------------------------------------------------------- +// Parsing helpers +// --------------------------------------------------------------------------- + +/// Parses the `.get ingestion resources` response into blob and queue endpoints. +/// +/// Response format (JSON): +/// ```json +/// { +/// "Tables": [{ +/// "Rows": [ +/// ["TempStorage", "https://host.blob.core.windows.net/container?sas"], +/// ["SecuredReadyForAggregationQueue", "https://host.queue.core.windows.net/queue?sas"], +/// ... +/// ] +/// }] +/// } +/// ``` +fn parse_storage_resources( + response: &str, +) -> crate::Result<(Vec, Vec)> { + let json: Value = serde_json::from_str(response) + .map_err(|e| format!("Failed to parse ingestion resources response: {e}"))?; + + let mut blob_endpoints = Vec::new(); + let mut queue_endpoints = Vec::new(); + + let rows = json + .get("Tables") + .and_then(|t| t.get(0)) + .and_then(|t| t.get("Rows")) + .and_then(|r| r.as_array()) + .ok_or("Unexpected ingestion resources response format")?; + + for row in rows { + let row_arr = match row.as_array() { + Some(arr) if arr.len() >= 2 => arr, + _ => continue, + }; + + let resource_type = row_arr[0].as_str().unwrap_or(""); + let resource_uri = row_arr[1].as_str().unwrap_or(""); + + if resource_uri.is_empty() { + continue; + } + + let endpoint = match parse_sas_url(resource_uri) { + Some(ep) => ep, + None => continue, + }; + + match resource_type { + "TempStorage" => blob_endpoints.push(endpoint), + "SecuredReadyForAggregationQueue" => queue_endpoints.push(endpoint), + _ => {} // Ignore other resource types (e.g. FailedIngestionsQueue) + } + } + + Ok((blob_endpoints, queue_endpoints)) +} + +/// Parses the `.get kusto identity token` response. +/// +/// Response format (JSON): +/// ```json +/// { +/// "Tables": [{ +/// "Rows": [[""]] +/// }] +/// } +/// ``` +fn parse_identity_token(response: &str) -> crate::Result { + let json: Value = serde_json::from_str(response) + .map_err(|e| format!("Failed to parse identity token response: {e}"))?; + + let token = json + .get("Tables") + .and_then(|t| t.get(0)) + .and_then(|t| t.get("Rows")) + .and_then(|r| r.as_array()) + .and_then(|rows| rows.first()) + .and_then(|row| row.as_array()) + .and_then(|arr| arr.first()) + .and_then(|v| v.as_str()) + .ok_or("Failed to extract identity token from ADX response")?; + + Ok(token.to_string()) +} + +/// Splits a SAS URL into base URL and SAS token query string. +/// +/// Input: `https://host.blob.core.windows.net/container?sv=...&sig=...` +/// Output: `StorageEndpoint { base_url: "https://...", sas_token: "sv=...&sig=..." }` +fn parse_sas_url(url: &str) -> Option { + let (base_url, sas_token) = url.split_once('?')?; + Some(StorageEndpoint { + base_url: base_url.to_string(), + sas_token: sas_token.to_string(), + }) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_sas_url_splits_correctly() { + let ep = + parse_sas_url("https://host.blob.core.windows.net/container?sv=2019&sig=abc").unwrap(); + assert_eq!(ep.base_url, "https://host.blob.core.windows.net/container"); + assert_eq!(ep.sas_token, "sv=2019&sig=abc"); + } + + #[test] + fn parse_sas_url_returns_none_without_query() { + assert!(parse_sas_url("https://host.blob.core.windows.net/container").is_none()); + } + + #[test] + fn parse_storage_resources_extracts_endpoints() { + let response = r#"{ + "Tables": [{ + "TableName": "Table_0", + "Columns": [], + "Rows": [ + ["TempStorage", "https://blob1.blob.core.windows.net/c1?sas1"], + ["TempStorage", "https://blob2.blob.core.windows.net/c2?sas2"], + ["SecuredReadyForAggregationQueue", "https://queue1.queue.core.windows.net/q1?sas3"], + ["SuccessfulIngestionsQueue", "https://other.queue.core.windows.net/q2?sas4"], + ["FailedIngestionsQueue", "https://other2.queue.core.windows.net/q3?sas5"] + ] + }] + }"#; + + let (blobs, queues) = parse_storage_resources(response).unwrap(); + assert_eq!(blobs.len(), 2); + assert_eq!(queues.len(), 1); + assert_eq!(blobs[0].base_url, "https://blob1.blob.core.windows.net/c1"); + assert_eq!(blobs[0].sas_token, "sas1"); + assert_eq!( + queues[0].base_url, + "https://queue1.queue.core.windows.net/q1" + ); + assert_eq!(queues[0].sas_token, "sas3"); + } + + #[test] + fn parse_identity_token_extracts_token() { + let response = r#"{ + "Tables": [{ + "TableName": "Table_0", + "Columns": [{"ColumnName": "AuthorizationContext"}], + "Rows": [["my_identity_token_value"]] + }] + }"#; + + let token = parse_identity_token(response).unwrap(); + assert_eq!(token, "my_identity_token_value"); + } +} diff --git a/src/sinks/azure_data_explorer/service.rs b/src/sinks/azure_data_explorer/service.rs new file mode 100644 index 0000000000000..bead0eabd3191 --- /dev/null +++ b/src/sinks/azure_data_explorer/service.rs @@ -0,0 +1,385 @@ +//! Service implementation for the `azure_data_explorer` sink. +//! +//! Implements **queued ingestion** matching the Fluent Bit `out_azure_kusto` plugin: +//! +//! 1. Ensure ingestion resources (blob + queue SAS URIs) are loaded and fresh. +//! 2. Upload the JSONL/MultiJSON payload as a blob to Azure Blob Storage (SAS-authenticated PUT). +//! 3. Enqueue an ingestion notification message to Azure Queue Storage (SAS-authenticated POST). +//! +//! The blob and queue endpoints are discovered via the ADX management command +//! `.get ingestion resources` and cached for 1 hour (matching Fluent Bit defaults). + +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use base64::Engine as _; +use bytes::Bytes; +use futures::future::BoxFuture; +use http::Request; +use tower::Service; +use uuid::Uuid; + +use super::resources::ResourceManager; +use crate::{ + http::HttpClient, + sinks::{ + prelude::*, + util::{ + buffer::compression::Compression, + http::{HttpRequest, HttpResponse}, + }, + }, +}; + +// --------------------------------------------------------------------------- +// Queued ingest configuration (shared across clones) +// --------------------------------------------------------------------------- + +/// Configuration for the queued ingest service, shared across all clones. +#[derive(Clone, Debug)] +pub(super) struct QueuedIngestConfig { + pub database: String, + pub table: String, + pub mapping_reference: Option, + pub compression: Compression, +} + +// --------------------------------------------------------------------------- +// Service +// --------------------------------------------------------------------------- + +/// A Tower `Service` that performs **queued ingestion** to Azure Data Explorer. +/// +/// Each `call()` invocation: +/// 1. Gets/refreshes ingestion resources (blob + queue SAS URIs + identity token) +/// 2. Uploads the payload to a randomly selected blob endpoint +/// 3. Enqueues an ingestion notification to a randomly selected queue endpoint +/// 4. Returns a synthetic `HttpResponse` indicating success or the first failure +pub(super) struct AzureDataExplorerService { + http_client: HttpClient, + resource_manager: ResourceManager, + config: Arc, + /// Round-robin index for blob endpoint selection. + blob_index: Arc, + /// Round-robin index for queue endpoint selection. + queue_index: Arc, +} + +impl AzureDataExplorerService { + pub(super) fn new( + http_client: HttpClient, + resource_manager: ResourceManager, + config: QueuedIngestConfig, + ) -> Self { + Self { + http_client, + resource_manager, + config: Arc::new(config), + blob_index: Arc::new(AtomicUsize::new(0)), + queue_index: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl Service> for AzureDataExplorerService { + type Response = HttpResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: HttpRequest<()>) -> Self::Future { + let http_client = self.http_client.clone(); + let resource_manager = self.resource_manager.clone(); + let config = Arc::clone(&self.config); + let blob_index = Arc::clone(&self.blob_index); + let queue_index = Arc::clone(&self.queue_index); + + let metadata = std::mem::take(request.metadata_mut()); + let raw_byte_size = metadata.request_encoded_size(); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + let payload = request.take_payload(); + + Box::pin(async move { + // 1. Get/refresh ingestion resources + let resources = resource_manager.get_resources().await?; + + // 2. Select blob endpoint (round-robin) + let blob_idx = + blob_index.fetch_add(1, Ordering::Relaxed) % resources.blob_endpoints.len(); + let blob_ep = &resources.blob_endpoints[blob_idx]; + + // 3. Generate unique blob ID + let epoch_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let blob_id = format!( + "vector__{}__{}__{}__{epoch_ms}", + config.database, + config.table, + Uuid::new_v4(), + ); + + let extension = if config.compression.content_encoding().is_some() { + ".multijson.gz" + } else { + ".multijson" + }; + + // 4. Upload payload to blob storage + let blob_uri = format!( + "{}/{blob_id}{extension}?{}", + blob_ep.base_url, blob_ep.sas_token + ); + + debug!( + message = "Uploading payload to blob storage.", + blob_uri = %blob_ep.base_url, + blob_id = %blob_id, + payload_bytes = payload.len(), + ); + + let blob_request = Request::put(&blob_uri) + .header("Content-Type", "application/json") + .header("x-ms-blob-type", "BlockBlob") + .header("x-ms-version", "2019-12-12") + .header("x-ms-app", "Kusto.Vector") + .header("x-ms-user", "Kusto.Vector") + .body(hyper::Body::from(payload.clone()))?; + + let blob_response = http_client.send(blob_request).await?; + let blob_status = blob_response.status(); + + if blob_status != http::StatusCode::CREATED { + let body = http_body::Body::collect(blob_response.into_body()) + .await? + .to_bytes(); + let body_str = String::from_utf8_lossy(&body); + let err_msg = format!( + "Blob upload failed: HTTP {} - {}", + blob_status, + &body_str[..body_str.len().min(500)] + ); + error!(message = %err_msg); + + // Return a synthetic response with the blob's error status code + // so the retry logic can decide whether to retry. + let synthetic = http::Response::builder() + .status(blob_status) + .body(Bytes::from(err_msg)) + .unwrap(); + return Ok(HttpResponse { + http_response: synthetic, + events_byte_size, + raw_byte_size, + }); + } + + // 5. Build the full blob URI for the ingestion message + let full_blob_uri = format!( + "{}/{blob_id}{extension}?{}", + blob_ep.base_url, blob_ep.sas_token + ); + + // 6. Create ingestion message (matching Fluent Bit's format) + let ingestion_message = create_ingestion_message( + &config, + &full_blob_uri, + payload.len(), + &resources.identity_token, + ); + + // 7. Base64-encode the message and wrap in Azure Queue XML format + let message_b64 = base64::engine::general_purpose::STANDARD.encode(&ingestion_message); + let queue_payload = format!( + "{message_b64}" + ); + + // 8. Enqueue ingestion notification (round-robin) + let queue_idx = + queue_index.fetch_add(1, Ordering::Relaxed) % resources.queue_endpoints.len(); + let queue_ep = &resources.queue_endpoints[queue_idx]; + let queue_uri = format!("{}/messages?{}", queue_ep.base_url, queue_ep.sas_token); + + debug!( + message = "Enqueueing ingestion notification.", + queue_uri = %queue_ep.base_url, + ); + + let queue_request = Request::post(&queue_uri) + .header("Content-Type", "application/atom+xml") + .header("x-ms-version", "2019-12-12") + .header("x-ms-app", "Kusto.Vector") + .header("x-ms-user", "Kusto.Vector") + .body(hyper::Body::from(queue_payload))?; + + let queue_response = http_client.send(queue_request).await?; + let queue_status = queue_response.status(); + + if queue_status != http::StatusCode::CREATED { + let body = http_body::Body::collect(queue_response.into_body()) + .await? + .to_bytes(); + let body_str = String::from_utf8_lossy(&body); + let err_msg = format!( + "Queue notification failed: HTTP {} - {}", + queue_status, + &body_str[..body_str.len().min(500)] + ); + error!(message = %err_msg); + + let synthetic = http::Response::builder() + .status(queue_status) + .body(Bytes::from(err_msg)) + .unwrap(); + return Ok(HttpResponse { + http_response: synthetic, + events_byte_size, + raw_byte_size, + }); + } + + debug!(message = "Queued ingestion completed successfully.", blob_id = %blob_id); + + // 9. Return synthetic 200 OK + let synthetic = http::Response::builder() + .status(http::StatusCode::OK) + .body(Bytes::from("queued")) + .unwrap(); + + Ok(HttpResponse { + http_response: synthetic, + events_byte_size, + raw_byte_size, + }) + }) + } +} + +impl Clone for AzureDataExplorerService { + fn clone(&self) -> Self { + Self { + http_client: self.http_client.clone(), + resource_manager: self.resource_manager.clone(), + config: Arc::clone(&self.config), + blob_index: Arc::clone(&self.blob_index), + queue_index: Arc::clone(&self.queue_index), + } + } +} + +// --------------------------------------------------------------------------- +// Ingestion message helpers +// --------------------------------------------------------------------------- + +/// Creates the JSON ingestion message matching Fluent Bit's format: +/// +/// ```json +/// { +/// "Id": "", +/// "BlobPath": "", +/// "RawDataSize": , +/// "DatabaseName": "", +/// "TableName": "", +/// "AdditionalProperties": { +/// "format": "multijson", +/// "authorizationContext": "", +/// "jsonMappingReference": "" +/// } +/// } +/// ``` +fn create_ingestion_message( + config: &QueuedIngestConfig, + blob_uri: &str, + raw_data_size: usize, + identity_token: &str, +) -> String { + let uuid = Uuid::new_v4(); + let mapping = config.mapping_reference.as_deref().unwrap_or(""); + + // Use format! to build compact JSON matching Fluent Bit's output. + format!( + r#"{{"Id":"{uuid}","BlobPath":"{blob_uri}","RawDataSize":{raw_data_size},"DatabaseName":"{}","TableName":"{}","ClientVersionForTracing":"Kusto.Vector:0.1.0","ApplicationForTracing":"Kusto.Vector","AdditionalProperties":{{"format":"multijson","authorizationContext":"{identity_token}","jsonMappingReference":"{mapping}"}}}}"#, + config.database, config.table, + ) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ingestion_message_format() { + let config = QueuedIngestConfig { + database: "testdb".to_string(), + table: "testtable".to_string(), + mapping_reference: Some("my_mapping".to_string()), + compression: Compression::None, + }; + + let msg = create_ingestion_message( + &config, + "https://blob.core.windows.net/c/blob.multijson?sas", + 1234, + "identity_tok", + ); + + let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap(); + assert!(parsed.get("Id").is_some()); + assert_eq!( + parsed["BlobPath"].as_str().unwrap(), + "https://blob.core.windows.net/c/blob.multijson?sas" + ); + assert_eq!(parsed["RawDataSize"].as_u64().unwrap(), 1234); + assert_eq!(parsed["DatabaseName"].as_str().unwrap(), "testdb"); + assert_eq!(parsed["TableName"].as_str().unwrap(), "testtable"); + assert_eq!( + parsed["AdditionalProperties"]["format"].as_str().unwrap(), + "multijson" + ); + assert_eq!( + parsed["AdditionalProperties"]["authorizationContext"] + .as_str() + .unwrap(), + "identity_tok" + ); + assert_eq!( + parsed["AdditionalProperties"]["jsonMappingReference"] + .as_str() + .unwrap(), + "my_mapping" + ); + } + + #[test] + fn ingestion_message_no_mapping() { + let config = QueuedIngestConfig { + database: "db".to_string(), + table: "tbl".to_string(), + mapping_reference: None, + compression: Compression::None, + }; + + let msg = create_ingestion_message(&config, "https://blob/path?sas", 42, "tok"); + let parsed: serde_json::Value = serde_json::from_str(&msg).unwrap(); + assert_eq!( + parsed["AdditionalProperties"]["jsonMappingReference"] + .as_str() + .unwrap(), + "" + ); + } +} diff --git a/src/sinks/azure_data_explorer/sink.rs b/src/sinks/azure_data_explorer/sink.rs new file mode 100644 index 0000000000000..811401408ca54 --- /dev/null +++ b/src/sinks/azure_data_explorer/sink.rs @@ -0,0 +1,71 @@ +//! Implementation of the `azure_data_explorer` sink. + +use super::request_builder::AzureDataExplorerRequestBuilder; +use crate::sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, +}; + +pub(super) struct AzureDataExplorerSink { + service: S, + batch_settings: BatcherSettings, + request_builder: AzureDataExplorerRequestBuilder, +} + +impl AzureDataExplorerSink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `AzureDataExplorerSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: AzureDataExplorerRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AzureDataExplorerSink +where + S: Service> + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/azure_data_explorer/tests.rs b/src/sinks/azure_data_explorer/tests.rs new file mode 100644 index 0000000000000..60ef810968748 --- /dev/null +++ b/src/sinks/azure_data_explorer/tests.rs @@ -0,0 +1,293 @@ +//! Unit and integration tests for the `azure_data_explorer` sink. + +use std::{collections::BTreeMap, convert::Infallible}; + +use futures::{future::ready, stream}; +use http::Response; +use hyper::{Body, Request}; + +use super::{ + auth::AzureDataExplorerAuth, + config::AzureDataExplorerConfig, + encoder::AzureDataExplorerEncoder, + request_builder::AzureDataExplorerRequestBuilder, + resources::ResourceManager, + service::{AzureDataExplorerService, QueuedIngestConfig}, + sink::AzureDataExplorerSink, +}; +use crate::{ + http::HttpClient, + sinks::{ + prelude::*, + util::{ + http::http_response_retry_logic, + service::GlobalTowerRequestConfigDefaults, + }, + }, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::spawn_blackhole_http_server, + }, +}; + +// ---------- helpers ---------------------------------------------------------- + +/// Mock HTTP handler that returns 200/201 for management commands and blob/queue operations. +/// - Management commands (.get ingestion resources, .get kusto identity token): 200 +/// - Blob upload (PUT): 201 +/// - Queue enqueue (POST to /messages): 201 +async fn mock_adx_response(req: Request) -> Result, Infallible> { + let path = req.uri().path().to_string(); + let method = req.method().clone(); + + if path.contains("/v1/rest/mgmt") { + // Check what management command is being sent + let body = http_body::Body::collect(req.into_body()) + .await + .map(|c| c.to_bytes()) + .unwrap_or_default(); + let body_str = String::from_utf8_lossy(&body); + + if body_str.contains(".get ingestion resources") { + // Return mock ingestion resources + let response_body = serde_json::json!({ + "Tables": [{ + "TableName": "Table_0", + "Columns": [], + "Rows": [ + ["TempStorage", "https://mockblob.blob.core.windows.net/container?sv=mock&sig=mock"], + ["SecuredReadyForAggregationQueue", "https://mockqueue.queue.core.windows.net/queue?sv=mock&sig=mock"] + ] + }] + }); + Ok(Response::new(Body::from(response_body.to_string()))) + } else if body_str.contains(".get kusto identity token") { + let response_body = serde_json::json!({ + "Tables": [{ + "TableName": "Table_0", + "Columns": [{"ColumnName": "AuthorizationContext"}], + "Rows": [["mock_identity_token"]] + }] + }); + Ok(Response::new(Body::from(response_body.to_string()))) + } else { + // Other management commands (e.g. .show version for healthcheck) + Ok(Response::new(Body::from("{}"))) + } + } else if method == http::Method::PUT { + // Blob upload - return 201 Created + Ok(Response::builder() + .status(201) + .body(Body::empty()) + .unwrap()) + } else if path.contains("/messages") { + // Queue enqueue - return 201 Created + Ok(Response::builder() + .status(201) + .body(Body::empty()) + .unwrap()) + } else { + Ok(Response::new(Body::from("{}"))) + } +} + +// ---------- unit tests ------------------------------------------------------- + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[test] +fn config_parses_with_required_fields() { + let _config: AzureDataExplorerConfig = toml::from_str( + r#" + ingestion_endpoint = "https://ingest-mycluster.eastus.kusto.windows.net" + database = "mydb" + table = "mytable" + tenant_id = "tid" + client_id = "cid" + client_secret = "csec" + "#, + ) + .unwrap(); +} + +#[test] +fn config_parses_with_mapping() { + let config: AzureDataExplorerConfig = toml::from_str( + r#" + ingestion_endpoint = "https://ingest-mycluster.eastus.kusto.windows.net/" + database = "mydb" + table = "mytable" + tenant_id = "tid" + client_id = "cid" + client_secret = "csec" + mapping_reference = "my_mapping" + "#, + ) + .unwrap(); + + assert_eq!(config.mapping_reference.as_deref(), Some("my_mapping")); +} + +#[test] +fn encoder_produces_jsonl_without_mutation() { + use crate::sinks::util::encoding::Encoder as SinkEncoder; + + let encoder = AzureDataExplorerEncoder { + transformer: Transformer::default(), + }; + + let mut event1 = LogEvent::default(); + event1.insert("message", "hello world"); + event1.insert("host", "server01"); + + let mut event2 = LogEvent::default(); + event2.insert("message", "second event"); + event2.insert("count", 42); + + let events = vec![Event::Log(event1), Event::Log(event2)]; + + let mut buf = Vec::new(); + let (written, _byte_size) = encoder.encode_input(events, &mut buf).unwrap(); + assert!(written > 0); + + let output = String::from_utf8(buf).unwrap(); + let lines: Vec<&str> = output.split('\n').collect(); + assert_eq!(lines.len(), 2, "Expected 2 JSONL lines, got: {lines:?}"); + + // Verify each line is valid JSON and contains expected fields. + let obj1: BTreeMap = + serde_json::from_str(lines[0]).expect("line 0 is valid JSON"); + assert_eq!( + obj1.get("message").and_then(|v| v.as_str()), + Some("hello world") + ); + assert_eq!( + obj1.get("host").and_then(|v| v.as_str()), + Some("server01") + ); + + let obj2: BTreeMap = + serde_json::from_str(lines[1]).expect("line 1 is valid JSON"); + assert_eq!( + obj2.get("message").and_then(|v| v.as_str()), + Some("second event") + ); + assert_eq!(obj2.get("count").and_then(|v| v.as_i64()), Some(42)); +} + +#[test] +fn encoder_single_event_no_trailing_newline() { + use crate::sinks::util::encoding::Encoder as SinkEncoder; + + let encoder = AzureDataExplorerEncoder { + transformer: Transformer::default(), + }; + + let mut event = LogEvent::default(); + event.insert("key", "value"); + + let events = vec![Event::Log(event)]; + + let mut buf = Vec::new(); + encoder.encode_input(events, &mut buf).unwrap(); + + let output = String::from_utf8(buf).unwrap(); + assert!( + !output.ends_with('\n'), + "single event should have no trailing newline" + ); + let _: serde_json::Value = serde_json::from_str(&output).expect("output is valid JSON"); +} + +#[test] +fn encoder_with_transformer_field_filtering() { + use crate::sinks::util::encoding::Encoder as SinkEncoder; + + let transformer: Transformer = toml::from_str(r#"only_fields = ["message"]"#).unwrap(); + + let encoder = AzureDataExplorerEncoder { transformer }; + + let mut event = LogEvent::default(); + event.insert("message", "keep me"); + event.insert("host", "drop me"); + event.insert("extra", "also drop"); + + let events = vec![Event::Log(event)]; + + let mut buf = Vec::new(); + encoder.encode_input(events, &mut buf).unwrap(); + + let output = String::from_utf8(buf).unwrap(); + let obj: BTreeMap = + serde_json::from_str(&output).expect("valid JSON"); + + assert_eq!( + obj.get("message").and_then(|v| v.as_str()), + Some("keep me") + ); + assert!( + obj.get("host").is_none(), + "filtered field 'host' should be absent" + ); + assert!( + obj.get("extra").is_none(), + "filtered field 'extra' should be absent" + ); +} + +// ---------- integration-style test with mock server -------------------------- + +#[tokio::test] +async fn component_spec_compliance() { + // Spawn a mock HTTP server that handles mgmt commands, blob upload, and queue enqueue. + let mock_endpoint = spawn_blackhole_http_server(mock_adx_response).await; + let mock_url = mock_endpoint.to_string(); + + let client = HttpClient::new(None, &Default::default()).unwrap(); + + // Use the mock auth provider — no real Entra token acquisition. + let auth = AzureDataExplorerAuth::mock("mock-token-for-testing"); + + // Resource manager pointing at our mock server + let resource_manager = ResourceManager::new(auth, client.clone(), mock_url); + + let compression = Compression::gzip_default(); + + let queued_config = QueuedIngestConfig { + database: "testdb".to_string(), + table: "testtable".to_string(), + mapping_reference: None, + compression, + }; + + let request_builder = AzureDataExplorerRequestBuilder { + encoder: AzureDataExplorerEncoder { + transformer: Transformer::default(), + }, + compression, + }; + + let service = AzureDataExplorerService::new(client, resource_manager, queued_config); + + let request_limits = + TowerRequestConfig::::default().into_settings(); + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let batch_settings = + BatchConfig::::default() + .validate() + .unwrap() + .into_batcher_settings() + .unwrap(); + + let sink = AzureDataExplorerSink::new(service, batch_settings, request_builder); + let sink = VectorSink::from_event_streamsink(sink); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 75dcd683f46a5..af95236757176 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -28,6 +28,8 @@ pub mod axiom; pub mod azure_blob; #[cfg(feature = "sinks-azure_blob")] pub mod azure_common; +#[cfg(feature = "sinks-azure_data_explorer")] +pub mod azure_data_explorer; #[cfg(feature = "sinks-azure_monitor_logs")] pub mod azure_monitor_logs; #[cfg(feature = "sinks-blackhole")]