From 8b243d08eac03823a87c7d74c65ce12944d0613d Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 2 Mar 2026 06:31:20 +0000 Subject: [PATCH 1/3] Add sample extensions --- rust/otap-dataflow/Cargo.toml | 4 + .../configs/otlp-bearer-auth.yaml | 47 ++ .../crates/contrib-nodes/Cargo.toml | 16 +- .../exporters/azure_monitor_exporter/auth.rs | 380 ----------- .../azure_monitor_exporter/client.rs | 201 ++---- .../azure_monitor_exporter/config.rs | 98 +-- .../exporters/azure_monitor_exporter/error.rs | 89 +-- .../azure_monitor_exporter/exporter.rs | 129 +--- .../azure_monitor_exporter/heartbeat.rs | 110 +-- .../in_flight_exports.rs | 12 + .../exporters/azure_monitor_exporter/mod.rs | 1 - .../azure_monitor_exporter/otlp-ame.yaml | 32 +- .../azure_monitor_exporter/transformer.rs | 11 +- .../src/extensions/azure_identity_auth/mod.rs | 644 ++++++++++++++++++ .../src/extensions/bearer_token_auth/mod.rs | 256 +++++++ .../contrib-nodes/src/extensions/mod.rs | 12 + .../crates/contrib-nodes/src/lib.rs | 3 + .../crates/engine/src/extensions/auth.rs | 65 ++ .../crates/otap/src/otap_grpc/middleware.rs | 1 + .../otap/src/otap_grpc/middleware/auth.rs | 60 ++ .../crates/otap/src/otlp_receiver.rs | 59 +- 21 files changed, 1406 insertions(+), 824 deletions(-) create mode 100644 rust/otap-dataflow/configs/otlp-bearer-auth.yaml delete mode 100644 rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/auth.rs create mode 100644 rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs create mode 100644 rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs create mode 100644 rust/otap-dataflow/crates/contrib-nodes/src/extensions/mod.rs create mode 100644 rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index bd7134fef4..6500281853 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -216,6 +216,10 @@ unchecked-arithmetic = [] # Experimental features experimental-tls = ["otap-df-otap/experimental-tls", "dep:rustls"] +# Contrib extensions (opt-in) - now in contrib-nodes +contrib-extensions = ["otap-df-contrib-nodes/contrib-extensions"] +azure-identity-auth-extension = ["otap-df-contrib-nodes/azure-identity-auth-extension"] +bearer-token-auth-extension = ["otap-df-contrib-nodes/bearer-token-auth-extension"] # Contrib exporters (opt-in) - now in contrib-nodes contrib-exporters = ["otap-df-contrib-nodes/contrib-exporters"] geneva-exporter = ["otap-df-contrib-nodes/geneva-exporter"] diff --git a/rust/otap-dataflow/configs/otlp-bearer-auth.yaml b/rust/otap-dataflow/configs/otlp-bearer-auth.yaml new file mode 100644 index 0000000000..2d70f5b1a1 --- /dev/null +++ b/rust/otap-dataflow/configs/otlp-bearer-auth.yaml @@ -0,0 +1,47 @@ +# OTLP Receiver with Bearer Token Auth Extension +# +# This configuration demonstrates server-side authentication using the +# bearer_token_auth extension. The extension validates that incoming +# gRPC requests carry an `Authorization: Bearer ` header matching +# the configured static token; requests without a valid token are rejected +# with gRPC UNAUTHENTICATED status. + +version: otel_dataflow/v1 +engine: { } + +groups: + default: + pipelines: + main: + policies: + channel_capacity: + control: + node: 100 + pipeline: 100 + pdata: 128 + + nodes: + # Extension node: provides server-side auth for receivers. + # Extensions are regular nodes identified by their URN prefix. + # They are started before pipeline components and stopped after. + bearer_auth: + type: "urn:otel:extension:bearer_token_auth" + config: + token: "my-secret-token" + + receiver: + type: receiver:otlp + config: + # Reference the auth extension by its node name. + # The standardized `auth.authenticator` field mirrors the + # Go Collector's configauth.Authentication pattern. + auth: + authenticator: "bearer_auth" + protocols: + grpc: + listening_addr: "127.0.0.1:4317" + + exporter: + type: exporter:debug + config: + verbosity: basic diff --git a/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml b/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml index 5feadea6fe..6bb4a5c145 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml +++ b/rust/otap-dataflow/crates/contrib-nodes/Cargo.toml @@ -50,6 +50,20 @@ reqwest = { workspace = true, optional = true, features = ["rustls"] } sysinfo = { workspace = true, optional = true } [features] +contrib-extensions = [ + "azure-identity-auth-extension", + "bearer-token-auth-extension", +] +azure-identity-auth-extension = [ + "dep:azure_identity", + "dep:azure_core", + "dep:http", + "dep:rand", +] +bearer-token-auth-extension = [ + "dep:http", +] + contrib-exporters = [ "geneva-exporter", "azure-monitor-exporter", @@ -60,9 +74,7 @@ geneva-exporter = [ ] azure-monitor-exporter = [ "dep:opentelemetry-proto", - "dep:azure_identity", "dep:reqwest", - "dep:azure_core", "dep:ahash", "dep:sysinfo", "dep:chrono", diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/auth.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/auth.rs deleted file mode 100644 index db41ba414d..0000000000 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/auth.rs +++ /dev/null @@ -1,380 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -use azure_core::credentials::{AccessToken, TokenCredential}; -use azure_identity::{ - DeveloperToolsCredential, DeveloperToolsCredentialOptions, ManagedIdentityCredential, - ManagedIdentityCredentialOptions, UserAssignedId, -}; -use otap_df_telemetry::{otel_debug, otel_info, otel_warn}; -use std::sync::Arc; - -use super::Error; -use super::config::{AuthConfig, AuthMethod}; -use super::metrics::AzureMonitorExporterMetricsRc; - -/// Minimum delay between token refresh retry attempts in seconds. -const MIN_RETRY_DELAY_SECS: f64 = 5.0; -/// Maximum delay between token refresh retry attempts in seconds. -const MAX_RETRY_DELAY_SECS: f64 = 30.0; -/// Maximum jitter percentage (±10%) to add to retry delays. -const MAX_RETRY_JITTER_RATIO: f64 = 0.10; - -#[derive(Clone, Debug)] -// TODO - Consolidate with crates/otap/src/{cloud_auth,object_store)/azure.rs -pub struct Auth { - credential: Arc, - scope: String, - metrics: AzureMonitorExporterMetricsRc, -} - -impl Auth { - pub fn new( - auth_config: &AuthConfig, - metrics: AzureMonitorExporterMetricsRc, - ) -> Result { - let credential = Self::create_credential(auth_config)?; - - Ok(Self { - credential, - scope: auth_config.scope.clone(), - metrics, - }) - } - - #[cfg(test)] - pub fn from_credential( - credential: Arc, - scope: String, - metrics: AzureMonitorExporterMetricsRc, - ) -> Self { - Self { - credential, - scope, - metrics, - } - } - - async fn get_token_internal(&self) -> Result { - let token_response = self - .credential - .get_token( - &[&self.scope], - Some(azure_core::credentials::TokenRequestOptions::default()), - ) - .await - .map_err(Error::token_acquisition)?; - - Ok(token_response) - } - - pub async fn get_token(&mut self) -> Result { - let mut attempt = 0_i32; - let start = tokio::time::Instant::now(); - loop { - attempt += 1; - - match self.get_token_internal().await { - Ok(token) => { - otel_debug!("azure_monitor_exporter.auth.get_token_succeeded", expires_on = %token.expires_on); - let mut m = self.metrics.borrow_mut(); - m.add_auth_success_latency(start.elapsed().as_millis() as f64); - return Ok(token); - } - Err(e) => { - otel_warn!("azure_monitor_exporter.auth.get_token_failed", attempt = attempt, error = %e); - self.metrics.borrow_mut().add_auth_failure(); - } - } - - // Calculate exponential backoff: 5s, 10s, 20s, 30s (capped) - let base_delay_secs = MIN_RETRY_DELAY_SECS * 2.0_f64.powi(attempt - 1); - let capped_delay_secs = base_delay_secs.min(MAX_RETRY_DELAY_SECS); - - // Add jitter: random value between -10% and +10% of the delay - let jitter_range = capped_delay_secs * MAX_RETRY_JITTER_RATIO; - let jitter = if jitter_range > 0.0 { - let random_factor = rand::random::() * 2.0 - 1.0; - random_factor * jitter_range - } else { - 0.0 - }; - - let delay_secs = (capped_delay_secs + jitter).max(1.0); - let delay = tokio::time::Duration::from_secs_f64(delay_secs); - - otel_warn!( - "azure_monitor_exporter.auth.retry_scheduled", - delay_secs = %delay_secs - ); - tokio::time::sleep(delay).await; - } - } - - fn create_credential(auth_config: &AuthConfig) -> Result, Error> { - match auth_config.method { - AuthMethod::ManagedIdentity => { - let mut options = ManagedIdentityCredentialOptions::default(); - - if let Some(client_id) = &auth_config.client_id { - otel_info!("azure_monitor_exporter.auth.credential_type", method = "user_assigned_managed_identity", client_id = %client_id); - options.user_assigned_id = Some(UserAssignedId::ClientId(client_id.clone())); - } else { - otel_info!( - "azure_monitor_exporter.auth.credential_type", - method = "system_assigned_managed_identity" - ); - } - - Ok(ManagedIdentityCredential::new(Some(options)) - .map_err(|e| Error::create_credential(AuthMethod::ManagedIdentity, e))?) - } - AuthMethod::Development => { - otel_info!( - "azure_monitor_exporter.auth.credential_type", - method = "developer_tools" - ); - Ok( - DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions::default())) - .map_err(|e| Error::create_credential(AuthMethod::Development, e))?, - ) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::super::metrics::{AzureMonitorExporterMetrics, AzureMonitorExporterMetricsTracker}; - use super::*; - use azure_core::credentials::TokenRequestOptions; - use azure_core::time::OffsetDateTime; - use otap_df_telemetry::registry::TelemetryRegistryHandle; - use otap_df_telemetry::testing::EmptyAttributes; - use std::cell::RefCell; - use std::rc::Rc; - use std::sync::atomic::{AtomicUsize, Ordering}; - - fn create_test_metrics() -> AzureMonitorExporterMetricsRc { - let registry = TelemetryRegistryHandle::new(); - let metric_set = - registry.register_metric_set::(EmptyAttributes()); - Rc::new(RefCell::new(AzureMonitorExporterMetricsTracker::new( - metric_set, - ))) - } - - #[derive(Debug)] - struct MockCredential { - token: String, - expires_in: azure_core::time::Duration, - call_count: Arc, - } - - fn make_mock_credential( - token: &str, - expires_in: azure_core::time::Duration, - call_count: Arc, - ) -> Arc { - let cred: Arc = Arc::new(MockCredential { - token: token.to_string(), - expires_in, - call_count, - }); - cred - } - - #[async_trait::async_trait] - impl TokenCredential for MockCredential { - async fn get_token( - &self, - _scopes: &[&str], - _options: Option>, - ) -> azure_core::Result { - let _ = self.call_count.fetch_add(1, Ordering::SeqCst); - - Ok(AccessToken { - token: self.token.clone().into(), - expires_on: OffsetDateTime::now_utc() + self.expires_in, - }) - } - } - - // ==================== Construction Tests ==================== - - #[tokio::test] - async fn test_from_credential_creates_auth() { - let credential = make_mock_credential( - "test_token", - azure_core::time::Duration::minutes(60), - Arc::new(AtomicUsize::new(0)), - ); - - let auth = - Auth::from_credential(credential, "test_scope".to_string(), create_test_metrics()); - assert_eq!(auth.scope, "test_scope"); - } - - #[tokio::test] - async fn test_new_with_managed_identity_user_assigned() { - let auth_config = AuthConfig { - method: AuthMethod::ManagedIdentity, - client_id: Some("test-client-id".to_string()), - scope: "https://test.scope".to_string(), - }; - - let auth = Auth::new(&auth_config, create_test_metrics()); - assert!(auth.is_ok()); - let auth = auth.unwrap(); - assert_eq!(auth.scope, "https://test.scope"); - } - - #[tokio::test] - async fn test_new_with_managed_identity_system_assigned() { - let auth_config = AuthConfig { - method: AuthMethod::ManagedIdentity, - client_id: None, - scope: "https://test.scope".to_string(), - }; - - let auth = Auth::new(&auth_config, create_test_metrics()); - assert!(auth.is_ok()); - } - - #[tokio::test] - async fn test_new_with_development_auth() { - let auth_config = AuthConfig { - method: AuthMethod::Development, - client_id: None, - scope: "https://test.scope".to_string(), - }; - - // May fail if Azure CLI not installed - both outcomes are valid - let result = Auth::new(&auth_config, create_test_metrics()); - match result { - Ok(auth) => assert_eq!(auth.scope, "https://test.scope"), - Err(Error::Auth { - kind: super::super::error::AuthErrorKind::CreateCredential { method }, - .. - }) => { - assert_eq!(method, AuthMethod::Development); - } - Err(err) => panic!("Unexpected error type: {:?}", err), - } - } - - // ==================== Token Fetching Tests ==================== - - #[tokio::test] - async fn test_get_token_internal_returns_valid_token() { - let call_count = Arc::new(AtomicUsize::new(0)); - let credential = make_mock_credential( - "test_token", - azure_core::time::Duration::minutes(60), - call_count.clone(), - ); - - let auth = Auth::from_credential(credential, "scope".to_string(), create_test_metrics()); - - let token = auth.get_token_internal().await.unwrap(); - assert_eq!(token.token.secret(), "test_token"); - assert_eq!(call_count.load(Ordering::SeqCst), 1); - } - - #[tokio::test] - async fn test_get_token_internal_calls_credential_each_time() { - let call_count = Arc::new(AtomicUsize::new(0)); - let credential = make_mock_credential( - "test_token", - azure_core::time::Duration::minutes(60), - call_count.clone(), - ); - - let auth = Auth::from_credential(credential, "scope".to_string(), create_test_metrics()); - - // Each call to get_token_internal should call the credential - let _ = auth.get_token_internal().await.unwrap(); - assert_eq!(call_count.load(Ordering::SeqCst), 1); - - let _ = auth.get_token_internal().await.unwrap(); - assert_eq!(call_count.load(Ordering::SeqCst), 2); - - let _ = auth.get_token_internal().await.unwrap(); - assert_eq!(call_count.load(Ordering::SeqCst), 3); - } - - #[tokio::test] - async fn test_get_token_internal_returns_cloned_tokens() { - let credential = make_mock_credential( - "test_token", - azure_core::time::Duration::minutes(60), - Arc::new(AtomicUsize::new(0)), - ); - - let auth = Auth::from_credential(credential, "scope".to_string(), create_test_metrics()); - - let token1 = auth.get_token_internal().await.unwrap(); - let token2 = auth.get_token_internal().await.unwrap(); - - // Same value from both calls - assert_eq!(token1.token.secret(), token2.token.secret()); - } - - // ==================== Error Handling Tests ==================== - - #[tokio::test] - async fn test_get_token_internal_propagates_credential_error() { - #[derive(Debug)] - struct FailingCredential; - - #[async_trait::async_trait] - impl TokenCredential for FailingCredential { - async fn get_token( - &self, - _scopes: &[&str], - _options: Option>, - ) -> azure_core::Result { - Err(azure_core::error::Error::new( - azure_core::error::ErrorKind::Credential, - "Mock credential failure", - )) - } - } - - let cred = FailingCredential; - let credential: Arc = Arc::new(cred); - let auth = Auth::from_credential(credential, "scope".to_string(), create_test_metrics()); - - let result = auth.get_token_internal().await; - assert!(result.is_err()); - match result.unwrap_err() { - Error::Auth { - kind: super::super::error::AuthErrorKind::TokenAcquisition, - .. - } => {} - err => panic!("Expected Auth token acquisition error, got: {:?}", err), - } - } - - // ==================== Clone Behavior Tests ==================== - - #[tokio::test] - async fn test_cloned_auth_shares_credential() { - let call_count = Arc::new(AtomicUsize::new(0)); - let credential = make_mock_credential( - "test_token", - azure_core::time::Duration::minutes(60), - call_count.clone(), - ); - - let auth1 = Auth::from_credential(credential, "scope".to_string(), create_test_metrics()); - let auth2 = auth1.clone(); - - // Both auth instances share the same credential - let _ = auth1.get_token_internal().await.unwrap(); - assert_eq!(call_count.load(Ordering::SeqCst), 1); - - let _ = auth2.get_token_internal().await.unwrap(); - assert_eq!(call_count.load(Ordering::SeqCst), 2); - } -} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs index bdf91cd778..551960c873 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs @@ -3,11 +3,12 @@ use bytes::Bytes; +use otap_df_engine::extensions::auth::ClientAuthenticatorHandle; use otap_df_telemetry::otel_warn; use rand::{RngExt, SeedableRng, rngs::SmallRng}; use reqwest::{ Client, - header::{AUTHORIZATION, CONTENT_ENCODING, CONTENT_TYPE, HeaderValue}, + header::{CONTENT_ENCODING, CONTENT_TYPE}, }; use tokio::time::{Duration, Instant}; @@ -29,8 +30,8 @@ pub struct LogsIngestionClient { http_client: Client, endpoint: String, - // Pre-formatted authorization header provider - auth_header: HeaderValue, + /// Client authenticator handle provided by an auth extension. + auth: ClientAuthenticatorHandle, /// Shared metrics tracker for recording HTTP status codes and latency. metrics: AzureMonitorExporterMetricsRc, @@ -68,23 +69,22 @@ impl LogsIngestionClientPool { Ok(clients) } - pub async fn initialize(&mut self, config: &ApiConfig) -> Result<(), Error> { + pub async fn initialize( + &mut self, + config: &ApiConfig, + auth: ClientAuthenticatorHandle, + ) -> Result<(), Error> { let http_clients = self.create_http_clients(self.clients.capacity())?; for http_client in http_clients { - let client = LogsIngestionClient::new(config, http_client, self.metrics.clone())?; + let client = + LogsIngestionClient::new(config, http_client, auth.clone(), self.metrics.clone())?; self.clients.push(client); } Ok(()) } - pub fn update_auth(&mut self, header: HeaderValue) { - for client in &mut self.clients { - client.update_auth(header.clone()); - } - } - #[inline(always)] pub fn take(&mut self) -> LogsIngestionClient { self.clients.pop().expect("client pool is empty") @@ -99,37 +99,36 @@ impl LogsIngestionClientPool { impl LogsIngestionClient { /// Creates a new Azure Monitor logs ingestion client instance from provided components. /// - /// Primarily used for testing. The auth header is initialized with a placeholder - /// and should be updated via `update_auth()` before making requests. + /// Primarily used for testing. /// /// # Arguments /// * `http_client` - The HTTP client to use for requests /// * `endpoint` - The full endpoint URL for the Azure Monitor ingestion API + /// * `auth` - Client authenticator handle from an auth extension /// /// # Returns - /// A configured client instance with a placeholder auth header + /// A configured client instance #[must_use] pub fn from_parts( http_client: Client, endpoint: String, + auth: ClientAuthenticatorHandle, metrics: AzureMonitorExporterMetricsRc, ) -> Self { Self { http_client, endpoint, - auth_header: HeaderValue::from_static("Bearer "), // placeholder, will be updated on first use + auth, metrics, } } /// Creates a new Azure Monitor logs ingestion client instance from the configuration. /// - /// The auth header is initialized with a placeholder and should be updated - /// via `update_auth()` before making requests. - /// /// # Arguments /// * `config` - The API configuration containing endpoint, DCR, and stream info /// * `http_client` - The HTTP client to use for requests + /// * `auth` - Client authenticator handle from an auth extension /// /// # Returns /// * `Ok(LogsIngestionClient)` - A configured client instance @@ -137,6 +136,7 @@ impl LogsIngestionClient { pub fn new( config: &ApiConfig, http_client: Client, + auth: ClientAuthenticatorHandle, metrics: AzureMonitorExporterMetricsRc, ) -> Result { let endpoint = format!( @@ -147,16 +147,11 @@ impl LogsIngestionClient { Ok(Self { http_client, endpoint, - auth_header: HeaderValue::from_static("Bearer "), // placeholder, will be updated on first use + auth, metrics, }) } - /// Update the authorization header with a new access token. - pub fn update_auth(&mut self, header: HeaderValue) { - self.auth_header = header; - } - /// Export compressed data to Log Analytics ingestion API with automatic retry. /// /// Retries on: @@ -223,16 +218,22 @@ impl LogsIngestionClient { let body_len = body.len(); let start = Instant::now(); - let response = self + let auth_headers = self + .auth + .get_request_metadata() + .map_err(|e| Error::ClientAuthUnavailable(e.to_string()))?; + + let mut request = self .http_client .post(&self.endpoint) .header(CONTENT_TYPE, "application/json") - .header(CONTENT_ENCODING, "gzip") - .header(AUTHORIZATION, &self.auth_header) - .body(body) - .send() - .await - .map_err(Error::network)?; + .header(CONTENT_ENCODING, "gzip"); + + for (name, value) in auth_headers { + request = request.header(name, value); + } + + let response = request.body(body).send().await.map_err(Error::network)?; let status_code = response.status().as_u16(); let elapsed = start.elapsed(); @@ -288,12 +289,31 @@ mod tests { use super::super::metrics::AzureMonitorExporterMetrics; use super::super::metrics::AzureMonitorExporterMetricsTracker; use super::*; + use otap_df_engine::extensions::auth::{AuthError, ClientAuthenticator}; use otap_df_telemetry::registry::TelemetryRegistryHandle; use otap_df_telemetry::testing::EmptyAttributes; - use reqwest::header::HeaderValue; use std::cell::RefCell; use std::rc::Rc; + // ==================== Test Auth Helper ==================== + + struct TestAuth; + + impl ClientAuthenticator for TestAuth { + fn get_request_metadata( + &self, + ) -> Result, AuthError> { + Ok(vec![( + http::header::AUTHORIZATION, + http::HeaderValue::from_static("Bearer test_token"), + )]) + } + } + + fn create_test_auth() -> ClientAuthenticatorHandle { + ClientAuthenticatorHandle::new(TestAuth) + } + // ==================== Test Helpers ==================== fn create_test_metrics() -> AzureMonitorExporterMetricsRc { @@ -334,8 +354,13 @@ mod tests { let http_client = create_test_http_client(); - let client = LogsIngestionClient::new(&api_config, http_client, create_test_metrics()) - .expect("failed to create client"); + let client = LogsIngestionClient::new( + &api_config, + http_client, + create_test_auth(), + create_test_metrics(), + ) + .expect("failed to create client"); assert_eq!( client.endpoint, @@ -355,7 +380,8 @@ mod tests { let http_client = create_test_http_client(); let client = - LogsIngestionClient::new(&api_config, http_client, create_test_metrics()).unwrap(); + LogsIngestionClient::new(&api_config, http_client, create_test_auth(), create_test_metrics()) + .unwrap(); assert!(client.endpoint.contains("dcr-abc-123-def")); assert!(client.endpoint.contains("Custom-Stream_Name")); @@ -366,12 +392,11 @@ mod tests { let client = LogsIngestionClient::from_parts( create_test_http_client(), "https://example.com/endpoint".to_string(), + create_test_auth(), create_test_metrics(), ); assert_eq!(client.endpoint, "https://example.com/endpoint"); - // auth_header is placeholder - assert_eq!(client.auth_header, HeaderValue::from_static("Bearer ")); } #[test] @@ -379,58 +404,16 @@ mod tests { let http_client = create_test_http_client(); let api_config = create_test_api_config(); - let client = - LogsIngestionClient::new(&api_config, http_client, create_test_metrics()).unwrap(); - - // Auth header is placeholder - assert_eq!(client.auth_header, HeaderValue::from_static("Bearer ")); - } - - // ==================== Auth Header Update Tests ==================== - - #[test] - fn test_update_auth_changes_header() { - let mut client = LogsIngestionClient::from_parts( - create_test_http_client(), - "https://example.com".to_string(), - create_test_metrics(), - ); - - assert_eq!(client.auth_header, HeaderValue::from_static("Bearer ")); - - client.update_auth(HeaderValue::from_static("Bearer new_token")); - - assert_eq!( - client.auth_header, - HeaderValue::from_static("Bearer new_token") - ); - } - - #[test] - fn test_update_auth_multiple_times() { - let mut client = LogsIngestionClient::from_parts( - create_test_http_client(), - "https://example.com".to_string(), + let client = LogsIngestionClient::new( + &api_config, + http_client, + create_test_auth(), create_test_metrics(), - ); - - client.update_auth(HeaderValue::from_static("Bearer token1")); - assert_eq!( - client.auth_header, - HeaderValue::from_static("Bearer token1") - ); - - client.update_auth(HeaderValue::from_static("Bearer token2")); - assert_eq!( - client.auth_header, - HeaderValue::from_static("Bearer token2") - ); + ) + .unwrap(); - client.update_auth(HeaderValue::from_static("Bearer token3")); - assert_eq!( - client.auth_header, - HeaderValue::from_static("Bearer token3") - ); + // Endpoint is correctly formatted + assert!(client.endpoint.contains("test-dcr")); } // ==================== LogsIngestionClientPool Tests ==================== @@ -459,6 +442,7 @@ mod tests { let client = LogsIngestionClient::new( &api_config, create_test_http_client(), + create_test_auth(), create_test_metrics(), ) .unwrap(); @@ -482,6 +466,7 @@ mod tests { let client = LogsIngestionClient::new( &api_config, create_test_http_client(), + create_test_auth(), create_test_metrics(), ) .unwrap(); @@ -514,6 +499,7 @@ mod tests { let client = LogsIngestionClient::new( &api_config, create_test_http_client(), + create_test_auth(), create_test_metrics(), ) .unwrap(); @@ -530,6 +516,7 @@ mod tests { let client1 = LogsIngestionClient::from_parts( create_test_http_client(), "https://example.com/endpoint".to_string(), + create_test_auth(), create_test_metrics(), ); @@ -538,45 +525,6 @@ mod tests { assert_eq!(client1.endpoint, client2.endpoint); } - #[test] - fn test_client_clone_has_same_auth_header() { - let mut client1 = LogsIngestionClient::from_parts( - create_test_http_client(), - "https://example.com".to_string(), - create_test_metrics(), - ); - - client1.update_auth(HeaderValue::from_static("Bearer test_token")); - let client2 = client1.clone(); - - assert_eq!(client1.auth_header, client2.auth_header); - } - - #[test] - fn test_client_clone_has_independent_header() { - let mut client1 = LogsIngestionClient::from_parts( - create_test_http_client(), - "https://example.com".to_string(), - create_test_metrics(), - ); - - client1.update_auth(HeaderValue::from_static("Bearer token1")); - let mut client2 = client1.clone(); - - // Modify client2's header - client2.update_auth(HeaderValue::from_static("Bearer token2")); - - // client1's header should be unchanged - assert_eq!( - client1.auth_header, - HeaderValue::from_static("Bearer token1") - ); - assert_eq!( - client2.auth_header, - HeaderValue::from_static("Bearer token2") - ); - } - // ==================== Edge Cases ==================== #[test] @@ -584,6 +532,7 @@ mod tests { let client = LogsIngestionClient::from_parts( create_test_http_client(), "".to_string(), + create_test_auth(), create_test_metrics(), ); diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs index 068621bd47..ec89f84e9f 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +use otap_df_engine::extensions::auth::ClientAuthConfig; use super::Error; use serde::Deserialize; use serde_json::Value; @@ -13,54 +14,11 @@ pub struct Config { /// API configuration for Azure Monitor pub api: ApiConfig, - /// Authentication configuration - #[serde(default)] - pub auth: AuthConfig, -} - -/// Authentication method for Azure -#[derive(Debug, Deserialize, Clone, PartialEq, Default)] -#[serde(rename_all = "lowercase")] -pub enum AuthMethod { - /// Use Managed Identity (system or user-assigned with client_id) - #[serde(alias = "msi", alias = "managed_identity")] - #[default] - ManagedIdentity, - - /// Use developer tools (Azure CLI, Azure Developer CLI) - #[serde(alias = "dev", alias = "developer", alias = "cli")] - Development, -} - -/// Authentication configuration for Azure -#[derive(Debug, Deserialize, Clone)] -pub struct AuthConfig { - /// Authentication method to use - #[serde(default)] - pub method: AuthMethod, - - /// Client ID for user-assigned managed identity (optional) - /// Only used when method is ManagedIdentity - /// If not provided with ManagedIdentity, system-assigned identity will be used - pub client_id: Option, - - /// OAuth scope for token acquisition (defaults to "https://monitor.azure.com/.default") - #[serde(default = "default_scope")] - pub scope: String, -} - -impl Default for AuthConfig { - fn default() -> Self { - Self { - method: AuthMethod::default(), - client_id: None, - scope: default_scope(), - } - } -} - -fn default_scope() -> String { - "https://monitor.azure.com/.default".to_string() + /// Client authentication configuration. + /// + /// The `authenticator` field must match the node name of an + /// `azure_identity_auth` extension declared in the pipeline configuration. + pub auth: ClientAuthConfig, } /// API configuration for connecting to Azure Monitor @@ -99,12 +57,10 @@ pub struct SchemaConfig { impl Config { /// Validate the configuration pub fn validate(&self) -> Result<(), Error> { - // Validate auth configuration - if self.auth.scope.is_empty() { - return Err(Error::Config( - "Invalid configuration: auth scope must be non-empty".to_string(), - )); - } + // Validate auth extension reference + self.auth.validate().map_err(|e| { + Error::Config(format!("Invalid configuration: {e}")) + })?; // Validate API configuration if self.api.dcr_endpoint.is_empty() { @@ -194,11 +150,7 @@ mod tests { dcr: "mydcr".to_string(), schema: SchemaConfig::default(), }, - auth: AuthConfig { - scope: "https://monitor.azure.com/.default".to_string(), - client_id: Some("myclientid".to_string()), - method: AuthMethod::ManagedIdentity, - }, + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, }; assert!(config.validate().is_ok()); @@ -213,7 +165,7 @@ mod tests { dcr: "".to_string(), schema: SchemaConfig::default(), }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, }; let result = config.validate(); @@ -224,6 +176,26 @@ mod tests { ); } + #[test] + fn test_invalid_config_missing_auth_authenticator() { + let config = Config { + api: ApiConfig { + dcr_endpoint: "https://example.com".to_string(), + stream_name: "mystream".to_string(), + dcr: "mydcr".to_string(), + schema: SchemaConfig::default(), + }, + auth: ClientAuthConfig { authenticator: "".to_string() }, + }; + + let result = config.validate(); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("auth.authenticator must be non-empty")); + } + #[test] fn test_schema_duplicate_columns() { let config = Config { @@ -241,7 +213,7 @@ mod tests { ]), }, }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, }; let result = config.validate(); @@ -281,7 +253,7 @@ mod tests { ]), }, }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, }; let result = config.validate(); @@ -310,7 +282,7 @@ mod tests { )]), }, }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, }; let result = config.validate(); diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/error.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/error.rs index 936c363985..8ce69ddf67 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/error.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/error.rs @@ -1,7 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -use super::config::AuthMethod; use http::StatusCode; use http::header::InvalidHeaderValue; @@ -21,18 +20,19 @@ pub enum Error { }, // ==================== Authentication Errors ==================== - /// Authentication/authorization error. - #[error("Auth error ({kind}){}", source.as_ref().map(|e| format!(": {}", e)).unwrap_or_default())] + /// Authentication/authorization error from server response. + #[error("Auth error ({kind})")] Auth { /// The kind of authentication error. kind: AuthErrorKind, - /// The underlying Azure error, if any. - #[source] - source: Option, /// Response body for HTTP auth errors (401/403). body: Option, }, + /// Client authenticator (extension) unavailable or returned an error. + #[error("Client auth unavailable: {0}")] + ClientAuthUnavailable(String), + // ==================== HTTP/Network Errors ==================== /// Failed to create HTTP client. #[error("Failed to create HTTP client")] @@ -149,12 +149,6 @@ pub enum Error { /// Authentication error classification. #[derive(Debug, Clone)] pub enum AuthErrorKind { - /// Failed to create credential (during setup). - CreateCredential { method: AuthMethod }, - /// Failed to acquire token. - TokenAcquisition, - /// Token refresh failed during retry. - TokenRefresh, /// Server returned 401. Unauthorized, /// Server returned 403. @@ -164,9 +158,6 @@ pub enum AuthErrorKind { impl std::fmt::Display for AuthErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::CreateCredential { method } => write!(f, "create credential: {method:?}"), - Self::TokenAcquisition => write!(f, "token acquisition"), - Self::TokenRefresh => write!(f, "token refresh"), Self::Unauthorized => write!(f, "unauthorized"), Self::Forbidden => write!(f, "forbidden"), } @@ -242,32 +233,11 @@ impl Error { Self::Network { kind, source } } - /// Creates a credential creation error. - #[must_use] - pub fn create_credential(method: AuthMethod, source: azure_core::error::Error) -> Self { - Self::Auth { - kind: AuthErrorKind::CreateCredential { method }, - source: Some(source), - body: None, - } - } - - /// Creates a token acquisition error. - #[must_use] - pub fn token_acquisition(source: azure_core::error::Error) -> Self { - Self::Auth { - kind: AuthErrorKind::TokenAcquisition, - source: Some(source), - body: None, - } - } - /// Creates an unauthorized (401) error. #[must_use] pub fn unauthorized(body: String) -> Self { Self::Auth { kind: AuthErrorKind::Unauthorized, - source: None, body: Some(body), } } @@ -277,7 +247,6 @@ impl Error { pub fn forbidden(body: String) -> Self { Self::Auth { kind: AuthErrorKind::Forbidden, - source: None, body: Some(body), } } @@ -329,31 +298,13 @@ mod tests { // ==================== Auth Error Tests ==================== #[test] - fn test_auth_create_credential_message() { - let azure_error = azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Credential, - "managed identity not available", - ); - let error = Error::create_credential(AuthMethod::ManagedIdentity, azure_error); + fn test_client_auth_unavailable_message() { + let error = Error::ClientAuthUnavailable("extension not found".to_string()); assert_eq!( error.to_string(), - "Auth error (create credential: ManagedIdentity): managed identity not available" + "Client auth unavailable: extension not found" ); - assert!(error.source().is_some()); - } - - #[test] - fn test_auth_token_acquisition_message() { - let azure_error = azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Credential, - "token expired", - ); - let error = Error::token_acquisition(azure_error); - assert_eq!( - error.to_string(), - "Auth error (token acquisition): token expired" - ); - assert!(error.source().is_some()); + assert!(error.source().is_none()); } #[test] @@ -459,13 +410,7 @@ mod tests { } .is_retryable() ); - assert!( - !Error::token_acquisition(azure_core::error::Error::with_message( - azure_core::error::ErrorKind::Credential, - "test" - )) - .is_retryable() - ); + assert!(!Error::ClientAuthUnavailable("test".to_string()).is_retryable()); } // ==================== Display Tests ==================== @@ -481,18 +426,6 @@ mod tests { #[test] fn test_auth_error_kind_display() { - assert_eq!( - AuthErrorKind::CreateCredential { - method: AuthMethod::ManagedIdentity - } - .to_string(), - "create credential: ManagedIdentity" - ); - assert_eq!( - AuthErrorKind::TokenAcquisition.to_string(), - "token acquisition" - ); - assert_eq!(AuthErrorKind::TokenRefresh.to_string(), "token refresh"); assert_eq!(AuthErrorKind::Unauthorized.to_string(), "unauthorized"); assert_eq!(AuthErrorKind::Forbidden.to_string(), "forbidden"); } diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs index 2fe77b0c50..b318195a5b 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use async_trait::async_trait; -use azure_core::credentials::AccessToken; use otap_df_channel::error::RecvError; use otap_df_config::SignalType; use otap_df_engine::ConsumerEffectHandlerExtension; @@ -10,6 +9,7 @@ use otap_df_engine::context::PipelineContext; use otap_df_engine::control::{AckMsg, NackMsg, NodeControlMsg}; use otap_df_engine::error::Error as EngineError; use otap_df_engine::extensions::ExtensionRegistry; +use otap_df_engine::extensions::auth::ClientAuthenticatorHandle; use otap_df_engine::local::exporter::{EffectHandler, Exporter}; use otap_df_engine::message::{Message, MessageChannel}; use otap_df_engine::terminal_state::TerminalState; @@ -19,7 +19,6 @@ use otap_df_pdata::views::otlp::bytes::logs::RawLogsData; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; use otap_df_pdata_views::views::logs::LogsDataView; -use super::auth::Auth; use super::client::LogsIngestionClientPool; use super::config::Config; use super::error::Error; @@ -31,7 +30,6 @@ use super::metrics::{AzureMonitorExporterMetrics, AzureMonitorExporterMetricsRc} use super::state::AzureMonitorExporterState; use super::transformer::Transformer; use otap_df_otap::pdata::{Context, OtapPdata}; -use reqwest::header::HeaderValue; use otap_df_telemetry::{otel_debug, otel_error, otel_info, otel_warn}; @@ -41,12 +39,6 @@ use std::rc::Rc; const MAX_IN_FLIGHT_EXPORTS: usize = 16; const PERIODIC_EXPORT_INTERVAL: u64 = 3; const HEARTBEAT_INTERVAL_SECONDS: u64 = 60; -/// Minimum interval between token refresh attempts (10 seconds). -const MIN_TOKEN_REFRESH_INTERVAL_SECS: u64 = 10; -/// Buffer time before token expiry to trigger a refresh. -/// Azure Identity SDK caches tokens internally and won't issue a new token -/// until ~5 minutes before expiry, so we schedule refresh at 295 seconds before expiry. -const TOKEN_EXPIRY_BUFFER_SECS: u64 = 295; /// Azure Monitor exporter. pub struct AzureMonitorExporter { @@ -58,7 +50,6 @@ pub struct AzureMonitorExporter { client_pool: LogsIngestionClientPool, in_flight_exports: InFlightExports, last_batch_queued_at: tokio::time::Instant, - heartbeat: Heartbeat, } impl AzureMonitorExporter { @@ -81,9 +72,6 @@ impl AzureMonitorExporter { // Create Gzip batcher let gzip_batcher = GzipBatcher::new(); - // Create heartbeat handler - let heartbeat = Heartbeat::new(&config.api)?; - Ok(Self { config, transformer, @@ -93,7 +81,6 @@ impl AzureMonitorExporter { client_pool: LogsIngestionClientPool::new(MAX_IN_FLIGHT_EXPORTS + 1, metrics), in_flight_exports: InFlightExports::new(MAX_IN_FLIGHT_EXPORTS), last_batch_queued_at: tokio::time::Instant::now(), - heartbeat, }) } @@ -357,25 +344,6 @@ impl AzureMonitorExporter { Ok(()) } - #[inline] - fn get_next_token_refresh(token: AccessToken) -> tokio::time::Instant { - let now = azure_core::time::OffsetDateTime::now_utc(); - let duration_remaining = if token.expires_on > now { - (token.expires_on - now).unsigned_abs() - } else { - std::time::Duration::ZERO - }; - - let token_valid_until = tokio::time::Instant::now() + duration_remaining; - let next_token_refresh = - token_valid_until - tokio::time::Duration::from_secs(TOKEN_EXPIRY_BUFFER_SECS); - std::cmp::max( - next_token_refresh, - tokio::time::Instant::now() - + tokio::time::Duration::from_secs(MIN_TOKEN_REFRESH_INTERVAL_SECS), - ) - } - async fn handle_message( &mut self, effect_handler: &EffectHandler, @@ -463,7 +431,7 @@ impl Exporter for AzureMonitorExporter { mut self: Box, mut msg_chan: MessageChannel, effect_handler: EffectHandler, - _extension_registry: ExtensionRegistry, + extension_registry: ExtensionRegistry, ) -> Result { effect_handler .info(&format!( @@ -474,15 +442,18 @@ impl Exporter for AzureMonitorExporter { let mut msg_id = 0; - let mut auth = Auth::new(&self.config.auth, self.metrics.clone()).map_err(|e| { - let error = Error::AuthHandlerCreation(Box::new(e)); - EngineError::InternalError { - message: error.to_string(), - } - })?; + // Get the client authenticator handle from the auth extension + let auth = extension_registry + .get::(&self.config.auth.authenticator) + .map_err(|e| EngineError::InternalError { + message: format!( + "Failed to get auth extension '{}': {e}", + self.config.auth.authenticator + ), + })?; self.client_pool - .initialize(&self.config.api) + .initialize(&self.config.api, auth.clone()) .await .map_err(|e| { let error = Error::ClientPoolInit(Box::new(e)); @@ -491,6 +462,13 @@ impl Exporter for AzureMonitorExporter { } })?; + // Create heartbeat handler (needs auth handle) + let mut heartbeat = Heartbeat::new(&self.config.api, auth).map_err(|e| { + EngineError::InternalError { + message: format!("Failed to create heartbeat handler: {e}"), + } + })?; + // Start periodic telemetry collection and retain the cancel handle for graceful shutdown let telemetry_timer_cancel_handle = effect_handler .start_periodic_telemetry(std::time::Duration::from_secs(1)) @@ -499,7 +477,6 @@ impl Exporter for AzureMonitorExporter { message: format!("Failed to start telemetry timer: {e}"), })?; - let mut next_token_refresh = tokio::time::Instant::now(); let mut next_periodic_export = tokio::time::Instant::now() + tokio::time::Duration::from_secs(PERIODIC_EXPORT_INTERVAL); let mut next_heartbeat_send = tokio::time::Instant::now(); @@ -511,45 +488,10 @@ impl Exporter for AzureMonitorExporter { tokio::select! { biased; - _ = tokio::time::sleep_until(next_token_refresh) => { - match auth.get_token().await { - Ok(access_token) => { - match HeaderValue::from_str(&format!("Bearer {}", access_token.token.secret())) { - Ok(header) => { - self.client_pool.update_auth(header.clone()); - self.heartbeat.update_auth(header.clone()); - - // Schedule next token refresh - next_token_refresh = Self::get_next_token_refresh(access_token); - - let refresh_in = next_token_refresh.saturating_duration_since(tokio::time::Instant::now()); - let total_secs = refresh_in.as_secs(); - let hours = total_secs / 3600; - let minutes = (total_secs % 3600) / 60; - let seconds = total_secs % 60; - - otel_info!("azure_monitor_exporter.auth.token_refresh", refresh_in = format!("{}h {}m {}s", hours, minutes, seconds)); - } - Err(e) => { - otel_error!("azure_monitor_exporter.auth.header_creation_failed", error = ?e); - // Retry every 10 seconds - next_token_refresh = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10); - } - } - - } - Err(e) => { - otel_error!("azure_monitor_exporter.auth.token_refresh_failed", error = ?e); - // Retry every 10 seconds - next_token_refresh = tokio::time::Instant::now() + tokio::time::Duration::from_secs(10); - } - } - } - _ = tokio::time::sleep_until(next_heartbeat_send) => { next_heartbeat_send = tokio::time::Instant::now() + tokio::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECONDS); self.metrics.borrow_mut().add_heartbeat(); - match self.heartbeat.send().await { + match heartbeat.send().await { Ok(_) => otel_debug!("azure_monitor_exporter.heartbeat.sent"), Err(e) => otel_warn!("azure_monitor_exporter.heartbeat.send_failed", error = ?e), } @@ -623,12 +565,12 @@ impl Exporter for AzureMonitorExporter { #[cfg(test)] mod tests { - use super::super::config::{ApiConfig, AuthConfig, SchemaConfig}; + use super::super::config::{ApiConfig, SchemaConfig}; use super::*; - use azure_core::time::OffsetDateTime; use bytes::Bytes; use http::StatusCode; use otap_df_engine::context::{ControllerContext, PipelineContext}; + use otap_df_engine::extensions::auth::ClientAuthConfig; use otap_df_engine::local::exporter::EffectHandler; use otap_df_engine::node::NodeId; use otap_df_otap::pdata::Context; @@ -654,7 +596,7 @@ mod tests { log_record_mapping: HashMap::new(), }, }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, } } @@ -665,31 +607,6 @@ mod tests { let _ = AzureMonitorExporter::new(pipeline_ctx, config).unwrap(); } - #[test] - fn test_get_next_token_refresh_logic() { - let now = OffsetDateTime::now_utc(); - let expires_on = now + azure_core::time::Duration::seconds(3600); - - let token = AccessToken { - token: "secret".into(), - expires_on, - }; - - let refresh_at = AzureMonitorExporter::get_next_token_refresh(token); - let duration_until_refresh = refresh_at.duration_since(tokio::time::Instant::now()); - - // Should be 3600 - 295 = 3305 seconds before refresh - // Allow some delta for execution time - let expected = 3305.0; - let actual = duration_until_refresh.as_secs_f64(); - assert!( - (actual - expected).abs() < 5.0, - "Expected ~{}, got {}", - expected, - actual - ); - } - #[tokio::test] async fn test_handle_export_success() { let config = create_test_config(); diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs index e276712385..e250d42095 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs @@ -7,9 +7,10 @@ use super::AZURE_MONITOR_EXPORTER_URN; use super::config::ApiConfig; use super::error::Error; use chrono::Utc; +use otap_df_engine::extensions::auth::ClientAuthenticatorHandle; use reqwest::{ Client, - header::{AUTHORIZATION, CONTENT_TYPE, HeaderValue}, + header::CONTENT_TYPE, }; use std::time::Duration; use sysinfo::System; @@ -23,8 +24,8 @@ pub struct Heartbeat { endpoint: String, heartbeat_row: HeartbeatRow, - /// Pre-formatted authorization header for zero-allocation reuse - pub auth_header: HeaderValue, + /// Client authenticator handle provided by an auth extension. + auth: ClientAuthenticatorHandle, } #[derive(Serialize)] @@ -109,7 +110,7 @@ fn default_heartbeat_os_minor_version() -> String { impl Heartbeat { /// Create a new Heartbeat instance. - pub fn new(config: &ApiConfig) -> Result { + pub fn new(config: &ApiConfig, auth: ClientAuthenticatorHandle) -> Result { let http_client = Client::builder() .http1_only() .timeout(Duration::from_secs(30)) @@ -133,14 +134,14 @@ impl Heartbeat { os_major_version: default_heartbeat_os_major_version(), os_minor_version: default_heartbeat_os_minor_version(), }, - auth_header: HeaderValue::from_static("Bearer "), + auth, }) } /// Create a Heartbeat from individual components (for testing). #[cfg(test)] #[must_use] - pub fn from_parts(client: Client, endpoint: String) -> Self { + pub fn from_parts(client: Client, endpoint: String, auth: ClientAuthenticatorHandle) -> Self { Self { client, endpoint, @@ -152,24 +153,30 @@ impl Heartbeat { os_major_version: "1".to_string(), os_minor_version: "0".to_string(), }, - auth_header: HeaderValue::from_static("Bearer "), + auth, } } - /// Update the authorization header with a new access token. - pub fn update_auth(&mut self, header: HeaderValue) { - self.auth_header = header; - } - /// Send a heartbeat to the Azure Monitor Logs Ingestion endpoint. pub async fn send(&mut self) -> Result<(), Error> { self.heartbeat_row.time = Utc::now().to_rfc3339(); let payload = serde_json::json!([self.heartbeat_row]); - let response = self + + let auth_headers = self + .auth + .get_request_metadata() + .map_err(|e| Error::ClientAuthUnavailable(e.to_string()))?; + + let mut request = self .client .post(&self.endpoint) - .header(CONTENT_TYPE, "application/json") - .header(AUTHORIZATION, &self.auth_header) + .header(CONTENT_TYPE, "application/json"); + + for (name, value) in auth_headers { + request = request.header(name, value); + } + + let response = request .body(payload.to_string()) .send() .await @@ -208,10 +215,30 @@ impl Heartbeat { #[cfg(test)] mod tests { use super::*; + use otap_df_engine::extensions::auth::{AuthError, ClientAuthenticator}; use std::collections::HashMap; use wiremock::matchers::method; use wiremock::{Mock, MockServer, ResponseTemplate}; + // ==================== Test Auth Helper ==================== + + struct TestAuth; + + impl ClientAuthenticator for TestAuth { + fn get_request_metadata( + &self, + ) -> Result, AuthError> { + Ok(vec![( + http::header::AUTHORIZATION, + http::HeaderValue::from_static("Bearer test_token"), + )]) + } + } + + fn create_test_auth() -> ClientAuthenticatorHandle { + ClientAuthenticatorHandle::new(TestAuth) + } + // ==================== Test Helpers ==================== fn create_test_client() -> Client { @@ -378,10 +405,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - // Set up auth header for the test - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_ok()); @@ -396,9 +422,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_err()); @@ -422,9 +448,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_err()); @@ -452,9 +478,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_err()); @@ -475,9 +501,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_err()); @@ -498,9 +524,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_err()); @@ -519,9 +545,9 @@ mod tests { .mount(&mock_server) .await; - let mut heartbeat = Heartbeat::from_parts(create_test_client(), mock_server.uri()); + let mut heartbeat = + Heartbeat::from_parts(create_test_client(), mock_server.uri(), create_test_auth()); - heartbeat.update_auth(HeaderValue::from_static("Bearer test_token")); let result = heartbeat.send().await; assert!(result.is_err()); @@ -537,8 +563,11 @@ mod tests { #[test] fn test_from_parts_creates_heartbeat() { - let heartbeat = - Heartbeat::from_parts(create_test_client(), "https://example.com".to_string()); + let heartbeat = Heartbeat::from_parts( + create_test_client(), + "https://example.com".to_string(), + create_test_auth(), + ); assert_eq!(heartbeat.endpoint, "https://example.com"); // Verify heartbeat row has default values @@ -546,21 +575,4 @@ mod tests { assert!(!heartbeat.heartbeat_row.os_name.is_empty()); assert!(!heartbeat.heartbeat_row.computer.is_empty()); } - - // ==================== update_auth Tests ==================== - - #[test] - fn test_update_auth_changes_header() { - let mut heartbeat = - Heartbeat::from_parts(create_test_client(), "https://example.com".to_string()); - - assert_eq!(heartbeat.auth_header, HeaderValue::from_static("Bearer ")); - - heartbeat.update_auth(HeaderValue::from_static("Bearer new_token")); - - assert_eq!( - heartbeat.auth_header, - HeaderValue::from_static("Bearer new_token") - ); - } } diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs index d601da3459..44c3847e00 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs @@ -126,6 +126,17 @@ mod tests { } fn create_test_client() -> LogsIngestionClient { + use otap_df_engine::extensions::auth::{AuthError, ClientAuthenticator, ClientAuthenticatorHandle}; + + struct TestAuth; + impl ClientAuthenticator for TestAuth { + fn get_request_metadata( + &self, + ) -> Result, AuthError> { + Ok(vec![(http::header::AUTHORIZATION, http::HeaderValue::from_static("Bearer test"))]) + } + } + // Use a client that will fail fast if actually used let http_client = Client::builder() .timeout(StdDuration::from_millis(1)) @@ -135,6 +146,7 @@ mod tests { LogsIngestionClient::from_parts( http_client, "http://localhost".to_string(), + ClientAuthenticatorHandle::new(TestAuth), create_test_metrics(), ) } diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/mod.rs index 0cddc2ce43..bdf160bd53 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/mod.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use otap_df_otap::OTAP_EXPORTER_FACTORIES; use otap_df_otap::pdata::OtapPdata; -mod auth; mod client; mod config; mod error; diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/otlp-ame.yaml b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/otlp-ame.yaml index b8f3dac8c5..6f9fcea724 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/otlp-ame.yaml +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/otlp-ame.yaml @@ -25,6 +25,23 @@ groups: pipelines: main: nodes: + # Extension node: provides Azure AD authentication for exporters. + azure-auth: + type: "urn:microsoft:extension:azure_identity_auth" + config: + # Use managed identity for AKS workloads + method: "managedidentity" # or "msi" + + # For user-assigned managed identity (common in AKS) + # Get this from your AKS managed identity configuration + client_id: "YOUR-USER-ASSIGNED-IDENTITY-CLIENT-ID" + + # For system-assigned managed identity (less common in AKS) + # Comment out the client_id field above + + # OAuth scope for Azure Monitor + scope: "https://monitor.azure.com/.default" + otlp-receiver: type: "urn:otel:receiver:otlp" config: @@ -70,20 +87,9 @@ groups: "exception.message": "ExceptionMessage" "user.id": "UserId" - # Authentication configuration for AKS + # Reference to the auth extension declared above auth: - # Use managed identity for AKS workloads - method: "managedidentity" # or "msi" - - # For user-assigned managed identity (common in AKS) - # Get this from your AKS managed identity configuration - client_id: "YOUR-USER-ASSIGNED-IDENTITY-CLIENT-ID" - - # For system-assigned managed identity (less common in AKS) - # Comment out the client_id field above - - # OAuth scope for Azure Monitor - scope: "https://monitor.azure.com/.default" + authenticator: "azure-auth" connections: - from: otlp-receiver diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs index 5c5ab2f9f2..6570530383 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs @@ -411,7 +411,8 @@ mod tests { } fn create_test_config() -> Config { - use super::super::config::{ApiConfig, AuthConfig, SchemaConfig}; + use super::super::config::{ApiConfig, SchemaConfig}; + use otap_df_engine::extensions::auth::ClientAuthConfig; Config { api: ApiConfig { @@ -431,7 +432,7 @@ mod tests { ]), }, }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, } } @@ -719,7 +720,8 @@ mod tests { #[test] fn test_empty_schema_mappings() { - use super::super::config::{ApiConfig, AuthConfig, SchemaConfig}; + use super::super::config::{ApiConfig, SchemaConfig}; + use otap_df_engine::extensions::auth::ClientAuthConfig; let config = Config { api: ApiConfig { @@ -732,11 +734,12 @@ mod tests { log_record_mapping: HashMap::new(), }, }, - auth: AuthConfig::default(), + auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, }; let transformer = Transformer::new(&config, create_test_metrics()); + let request = ExportLogsServiceRequest { resource_logs: vec![ResourceLogs { resource: Some(Resource { diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs new file mode 100644 index 0000000000..b0441baed8 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs @@ -0,0 +1,644 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Azure Identity authentication extension. +//! +//! This extension acquires Azure AD tokens using `azure_identity` credentials +//! (Managed Identity or Developer Tools) and exposes them as a +//! [`ClientAuthenticatorHandle`] so that exporters and other pipeline +//! components can attach `Authorization: Bearer ` headers without +//! managing their own token lifecycle. +//! +//! # Token Refresh +//! +//! The extension's [`start`](Extension::start) loop proactively refreshes +//! the token before it expires. Consumers call +//! [`ClientAuthenticatorHandle::get_request_metadata`] to pull the latest +//! cached header — no push/update plumbing is required. +//! +//! # Example YAML Configuration +//! +//! ```yaml +//! nodes: +//! azure_auth: +//! type: "urn:microsoft:extension:azure_identity_auth" +//! config: +//! method: managed_identity +//! scope: "https://monitor.azure.com/.default" +//! ``` + +use async_trait::async_trait; +use azure_core::credentials::{AccessToken, TokenCredential}; +use azure_identity::{ + DeveloperToolsCredential, DeveloperToolsCredentialOptions, ManagedIdentityCredential, + ManagedIdentityCredentialOptions, UserAssignedId, +}; +use linkme::distributed_slice; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::ExtensionFactory; +use otap_df_engine::config::ExtensionConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::control::ExtensionControlMsg; +use otap_df_engine::error::Error as EngineError; +use otap_df_engine::extension::ExtensionWrapper; +use otap_df_engine::extensions::auth::{AuthError, ClientAuthenticator, ClientAuthenticatorHandle}; +use otap_df_engine::extensions::ExtensionHandles; +use otap_df_engine::local::extension::{self as local, ControlChannel, EffectHandler}; +use otap_df_engine::node::NodeId; +use otap_df_telemetry::{otel_debug, otel_error, otel_info, otel_warn}; +use http::header::HeaderValue; +use serde::Deserialize; +use std::sync::{Arc, Mutex}; + +use otap_df_otap::OTAP_EXTENSION_FACTORIES; + +/// URN identifying the Azure Identity Auth extension in configuration. +pub const AZURE_IDENTITY_AUTH_URN: &str = "urn:microsoft:extension:azure_identity_auth"; + +/// Minimum delay between token refresh retry attempts in seconds. +const MIN_RETRY_DELAY_SECS: f64 = 5.0; +/// Maximum delay between token refresh retry attempts in seconds. +const MAX_RETRY_DELAY_SECS: f64 = 30.0; +/// Maximum jitter percentage (±10%) to add to retry delays. +const MAX_RETRY_JITTER_RATIO: f64 = 0.10; +/// Minimum interval between token refresh attempts (10 seconds). +const MIN_TOKEN_REFRESH_INTERVAL_SECS: u64 = 10; +/// Buffer time before token expiry to trigger a refresh. +/// Azure Identity SDK caches tokens internally and won't issue a new token +/// until ~5 minutes before expiry, so we schedule refresh at 295 seconds +/// before expiry. +const TOKEN_EXPIRY_BUFFER_SECS: u64 = 295; + +// ─── Configuration ──────────────────────────────────────────────────────── + +/// Authentication method for Azure credentials. +#[derive(Debug, Deserialize, Clone, PartialEq, Default)] +#[serde(rename_all = "lowercase")] +pub enum AuthMethod { + /// Use Managed Identity (system or user-assigned with client_id). + #[serde(alias = "msi", alias = "managed_identity")] + #[default] + ManagedIdentity, + + /// Use developer tools (Azure CLI, Azure Developer CLI). + #[serde(alias = "dev", alias = "developer", alias = "cli")] + Development, +} + +/// Configuration for the Azure Identity Auth extension. +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct Config { + /// Authentication method to use. + #[serde(default)] + pub method: AuthMethod, + + /// Client ID for user-assigned managed identity (optional). + /// Only used when method is ManagedIdentity. + /// If not provided with ManagedIdentity, system-assigned identity will be used. + pub client_id: Option, + + /// OAuth scope for token acquisition (defaults to "https://monitor.azure.com/.default"). + #[serde(default = "default_scope")] + pub scope: String, +} + +fn default_scope() -> String { + "https://monitor.azure.com/.default".to_string() +} + +// ─── ClientAuthenticator implementation ─────────────────────────────────── + +/// A [`ClientAuthenticator`] backed by a cached Azure AD bearer token. +/// +/// The extension's background loop refreshes the token and stores it in the +/// shared `Arc>`. Consumers (exporters, heartbeat clients, etc.) +/// read the latest cached header on every call to [`get_request_metadata`]. +struct AzureIdentityClientAuth { + cached_header: Arc>>, +} + +impl ClientAuthenticator for AzureIdentityClientAuth { + fn get_request_metadata( + &self, + ) -> Result, AuthError> { + let header = self + .cached_header + .lock() + .expect("cached_header lock poisoned") + .clone(); + match header { + Some(h) => Ok(vec![(http::header::AUTHORIZATION, h)]), + None => Err(AuthError { + message: "Azure AD token not yet available".into(), + }), + } + } +} + +// ─── Extension implementation ───────────────────────────────────────────── + +/// The Azure Identity Auth extension. +/// +/// Runs a background loop that proactively refreshes an Azure AD access token +/// and stores the formatted `Authorization: Bearer ` header value for +/// consumers to pull via [`ClientAuthenticatorHandle::get_request_metadata`]. +struct AzureIdentityAuthExtension { + credential: Arc, + scope: String, + cached_header: Arc>>, +} + +impl AzureIdentityAuthExtension { + /// Acquire a token with retry and exponential backoff. + async fn get_token(&self) -> Result { + let mut attempt = 0_i32; + loop { + attempt += 1; + match self + .credential + .get_token( + &[&self.scope], + Some(azure_core::credentials::TokenRequestOptions::default()), + ) + .await + { + Ok(token) => { + otel_debug!( + "azure_identity_auth.get_token_succeeded", + expires_on = %token.expires_on + ); + return Ok(token); + } + Err(e) => { + otel_warn!( + "azure_identity_auth.get_token_failed", + attempt = attempt, + error = %e + ); + } + } + + // Exponential backoff: 5s, 10s, 20s, 30s (capped) + let base_delay_secs = MIN_RETRY_DELAY_SECS * 2.0_f64.powi(attempt - 1); + let capped_delay_secs = base_delay_secs.min(MAX_RETRY_DELAY_SECS); + + // Add jitter: ±10% + let jitter_range = capped_delay_secs * MAX_RETRY_JITTER_RATIO; + let jitter = if jitter_range > 0.0 { + let random_factor = rand::random::() * 2.0 - 1.0; + random_factor * jitter_range + } else { + 0.0 + }; + + let delay_secs = (capped_delay_secs + jitter).max(1.0); + let delay = tokio::time::Duration::from_secs_f64(delay_secs); + + otel_warn!( + "azure_identity_auth.retry_scheduled", + delay_secs = %delay_secs + ); + tokio::time::sleep(delay).await; + } + } +} + +/// Compute the next token refresh instant based on the token's expiry time. +fn get_next_token_refresh(token: &AccessToken) -> tokio::time::Instant { + let now = azure_core::time::OffsetDateTime::now_utc(); + let duration_remaining = if token.expires_on > now { + (token.expires_on - now).unsigned_abs() + } else { + std::time::Duration::ZERO + }; + + let token_valid_until = tokio::time::Instant::now() + duration_remaining; + let next_token_refresh = + token_valid_until - tokio::time::Duration::from_secs(TOKEN_EXPIRY_BUFFER_SECS); + std::cmp::max( + next_token_refresh, + tokio::time::Instant::now() + + tokio::time::Duration::from_secs(MIN_TOKEN_REFRESH_INTERVAL_SECS), + ) +} + +#[async_trait(?Send)] +impl local::Extension for AzureIdentityAuthExtension { + async fn start( + self: Box, + mut ctrl_chan: ControlChannel, + effect_handler: EffectHandler, + ) -> Result<(), EngineError> { + effect_handler + .info(&format!( + "[AzureIdentityAuth] Starting: scope={}", + self.scope + )) + .await; + + let mut next_token_refresh = tokio::time::Instant::now(); + + loop { + tokio::select! { + biased; + + msg = ctrl_chan.recv() => { + match msg { + Ok(ExtensionControlMsg::Shutdown { .. }) => { + otel_info!("azure_identity_auth.shutdown"); + break; + } + Ok(_) => {} // ignore other control messages + Err(_) => break, + } + } + + _ = tokio::time::sleep_until(next_token_refresh) => { + match self.get_token().await { + Ok(access_token) => { + match HeaderValue::from_str( + &format!("Bearer {}", access_token.token.secret()), + ) { + Ok(header) => { + *self + .cached_header + .lock() + .expect("cached_header lock poisoned") = Some(header); + + next_token_refresh = get_next_token_refresh(&access_token); + + let refresh_in = next_token_refresh + .saturating_duration_since(tokio::time::Instant::now()); + let total_secs = refresh_in.as_secs(); + let hours = total_secs / 3600; + let minutes = (total_secs % 3600) / 60; + let seconds = total_secs % 60; + + otel_info!( + "azure_identity_auth.token_refresh", + refresh_in = + format!("{}h {}m {}s", hours, minutes, seconds) + ); + } + Err(e) => { + otel_error!( + "azure_identity_auth.header_creation_failed", + error = ?e + ); + next_token_refresh = tokio::time::Instant::now() + + tokio::time::Duration::from_secs( + MIN_TOKEN_REFRESH_INTERVAL_SECS, + ); + } + } + } + Err(e) => { + otel_error!( + "azure_identity_auth.token_refresh_failed", + error = %e + ); + next_token_refresh = tokio::time::Instant::now() + + tokio::time::Duration::from_secs( + MIN_TOKEN_REFRESH_INTERVAL_SECS, + ); + } + } + } + } + } + Ok(()) + } +} + +// ─── Credential creation ────────────────────────────────────────────────── + +fn create_credential( + config: &Config, +) -> Result, otap_df_config::error::Error> { + match config.method { + AuthMethod::ManagedIdentity => { + let mut options = ManagedIdentityCredentialOptions::default(); + + if let Some(client_id) = &config.client_id { + otel_info!( + "azure_identity_auth.credential_type", + method = "user_assigned_managed_identity", + client_id = %client_id + ); + options.user_assigned_id = Some(UserAssignedId::ClientId(client_id.clone())); + } else { + otel_info!( + "azure_identity_auth.credential_type", + method = "system_assigned_managed_identity" + ); + } + + ManagedIdentityCredential::new(Some(options)).map(|c| c as Arc).map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { + error: format!("Failed to create managed identity credential: {e}"), + } + }) + } + AuthMethod::Development => { + otel_info!( + "azure_identity_auth.credential_type", + method = "developer_tools" + ); + DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions::default())).map(|c| c as Arc).map_err( + |e| otap_df_config::error::Error::InvalidUserConfig { + error: format!("Failed to create developer tools credential: {e}"), + }, + ) + } + } +} + +// ─── Factory registration ───────────────────────────────────────────────── + +/// Register the Azure Identity Auth extension with the OTAP pipeline factory. +#[allow(unsafe_code)] +#[distributed_slice(OTAP_EXTENSION_FACTORIES)] +pub static AZURE_IDENTITY_AUTH_EXTENSION: ExtensionFactory = ExtensionFactory { + name: AZURE_IDENTITY_AUTH_URN, + create: |_pipeline_ctx: PipelineContext, + node: NodeId, + node_config: Arc, + extension_config: &ExtensionConfig| { + // Deserialize user config JSON into typed Config + let cfg: Config = serde_json::from_value(node_config.config.clone()).map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { + error: e.to_string(), + } + })?; + + // Validate scope is non-empty + if cfg.scope.is_empty() { + return Err(otap_df_config::error::Error::InvalidUserConfig { + error: "auth scope must be non-empty".to_string(), + }); + } + + // Create the Azure credential + let credential = create_credential(&cfg)?; + + // Shared state: the cached Authorization header + let cached_header: Arc>> = Arc::new(Mutex::new(None)); + + // Build the ClientAuthenticator handle for consumers + let auth = AzureIdentityClientAuth { + cached_header: cached_header.clone(), + }; + let mut handles = ExtensionHandles::new(); + handles.register(ClientAuthenticatorHandle::new(auth)); + + // Build the extension instance + let extension = AzureIdentityAuthExtension { + credential, + scope: cfg.scope, + cached_header, + }; + + Ok(ExtensionWrapper::local( + extension, + handles, + node, + node_config, + extension_config, + )) + }, + validate_config: otap_df_config::validation::validate_typed_config::, +}; + +#[cfg(test)] +mod tests { + use super::*; + use azure_core::credentials::TokenRequestOptions; + use azure_core::time::OffsetDateTime; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Debug)] + struct MockCredential { + token: String, + expires_in: azure_core::time::Duration, + call_count: Arc, + } + + #[async_trait::async_trait] + impl TokenCredential for MockCredential { + async fn get_token( + &self, + _scopes: &[&str], + _options: Option>, + ) -> azure_core::Result { + let _ = self.call_count.fetch_add(1, Ordering::SeqCst); + Ok(AccessToken { + token: self.token.clone().into(), + expires_on: OffsetDateTime::now_utc() + self.expires_in, + }) + } + } + + fn mock_credential( + token: &str, + expires_in: azure_core::time::Duration, + ) -> (Arc, Arc) { + let call_count = Arc::new(AtomicUsize::new(0)); + let cred: Arc = Arc::new(MockCredential { + token: token.to_string(), + expires_in, + call_count: call_count.clone(), + }); + (cred, call_count) + } + + #[test] + fn test_urn_constant() { + assert_eq!( + AZURE_IDENTITY_AUTH_URN, + "urn:microsoft:extension:azure_identity_auth" + ); + } + + #[test] + fn test_client_auth_returns_none_before_refresh() { + let cached = Arc::new(Mutex::new(None)); + let auth = AzureIdentityClientAuth { + cached_header: cached, + }; + + let err = auth.get_request_metadata().unwrap_err(); + assert!(err.message.contains("not yet available")); + } + + #[test] + fn test_client_auth_returns_cached_header() { + let cached = Arc::new(Mutex::new(Some( + HeaderValue::from_static("Bearer test-token"), + ))); + let auth = AzureIdentityClientAuth { + cached_header: cached, + }; + + let metadata = auth.get_request_metadata().unwrap(); + assert_eq!(metadata.len(), 1); + assert_eq!(metadata[0].0, http::header::AUTHORIZATION); + assert_eq!(metadata[0].1, "Bearer test-token"); + } + + #[test] + fn test_client_auth_sees_updates() { + let cached: Arc>> = Arc::new(Mutex::new(None)); + let auth = AzureIdentityClientAuth { + cached_header: cached.clone(), + }; + + assert!(auth.get_request_metadata().is_err()); + + *cached.lock().unwrap() = Some(HeaderValue::from_static("Bearer v1")); + let meta = auth.get_request_metadata().unwrap(); + assert_eq!(meta[0].1, "Bearer v1"); + + *cached.lock().unwrap() = Some(HeaderValue::from_static("Bearer v2")); + let meta = auth.get_request_metadata().unwrap(); + assert_eq!(meta[0].1, "Bearer v2"); + } + + #[test] + fn test_get_next_token_refresh_far_future() { + let now = OffsetDateTime::now_utc(); + let expires_on = now + azure_core::time::Duration::seconds(3600); + let token = AccessToken { + token: "secret".into(), + expires_on, + }; + + let refresh_at = get_next_token_refresh(&token); + let duration_until_refresh = refresh_at.duration_since(tokio::time::Instant::now()); + + // Should be ~3600 - 295 = 3305 seconds + let expected = 3305.0; + let actual = duration_until_refresh.as_secs_f64(); + assert!( + (actual - expected).abs() < 5.0, + "Expected ~{expected}, got {actual}" + ); + } + + #[test] + fn test_get_next_token_refresh_already_expired() { + let now = OffsetDateTime::now_utc(); + let expires_on = now - azure_core::time::Duration::seconds(100); + let token = AccessToken { + token: "expired".into(), + expires_on, + }; + + let refresh_at = get_next_token_refresh(&token); + let delay = refresh_at.duration_since(tokio::time::Instant::now()); + + // Should be at least MIN_TOKEN_REFRESH_INTERVAL_SECS + assert!(delay.as_secs() >= MIN_TOKEN_REFRESH_INTERVAL_SECS - 1); + } + + #[tokio::test(flavor = "current_thread")] + async fn test_extension_refreshes_token() { + use otap_df_engine::config::ExtensionConfig; + use otap_df_engine::extensions::ExtensionHandles; + use otap_df_engine::testing::test_node; + use otap_df_telemetry::reporter::MetricsReporter; + use std::time::Duration; + + let (credential, call_count) = + mock_credential("my-azure-token", azure_core::time::Duration::seconds(3600)); + + let cached_header: Arc>> = Arc::new(Mutex::new(None)); + + let extension = AzureIdentityAuthExtension { + credential, + scope: "https://monitor.azure.com/.default".to_string(), + cached_header: cached_header.clone(), + }; + + let config = ExtensionConfig::new("test_azure_auth"); + let user_config = Arc::new(NodeUserConfig::new_receiver_config("test_ext")); + let ext = ExtensionWrapper::local( + extension, + ExtensionHandles::new(), + test_node("test_azure_auth"), + user_config, + &config, + ); + + let sender = ext.control_sender(); + let (_metrics_rx, metrics_reporter) = MetricsReporter::create_new_and_receiver(1); + + let local = tokio::task::LocalSet::new(); + local + .run_until(async { + let handle = tokio::task::spawn_local(async move { + ext.start(metrics_reporter).await.expect("extension failed"); + }); + + // Wait for the first token refresh + tokio::time::sleep(Duration::from_millis(100)).await; + + // Verify token was acquired + assert!(call_count.load(Ordering::SeqCst) >= 1); + + // Verify cached header is set + let header = cached_header.lock().unwrap().clone(); + assert!(header.is_some()); + assert_eq!(header.unwrap(), "Bearer my-azure-token"); + + // Shutdown + sender + .send(ExtensionControlMsg::Shutdown { + deadline: std::time::Instant::now(), + reason: "test".to_owned(), + }) + .await + .expect("send failed"); + + tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("extension did not shut down in time") + .expect("join error"); + }) + .await; + } + + #[test] + fn test_config_deserialization_defaults() { + let json = serde_json::json!({}); + let cfg: Config = serde_json::from_value(json).unwrap(); + assert_eq!(cfg.method, AuthMethod::ManagedIdentity); + assert_eq!(cfg.scope, "https://monitor.azure.com/.default"); + assert!(cfg.client_id.is_none()); + } + + #[test] + fn test_config_deserialization_development() { + let json = serde_json::json!({ + "method": "development", + "scope": "https://custom.scope/.default" + }); + let cfg: Config = serde_json::from_value(json).unwrap(); + assert_eq!(cfg.method, AuthMethod::Development); + assert_eq!(cfg.scope, "https://custom.scope/.default"); + } + + #[test] + fn test_config_deserialization_aliases() { + for alias in ["msi", "managed_identity"] { + let json = serde_json::json!({ "method": alias }); + let cfg: Config = serde_json::from_value(json).unwrap(); + assert_eq!(cfg.method, AuthMethod::ManagedIdentity); + } + for alias in ["dev", "developer", "cli"] { + let json = serde_json::json!({ "method": alias }); + let cfg: Config = serde_json::from_value(json).unwrap(); + assert_eq!(cfg.method, AuthMethod::Development); + } + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs new file mode 100644 index 0000000000..2259012d72 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs @@ -0,0 +1,256 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Bearer Token authentication extension (server-side). +//! +//! This extension provides a simple static bearer token check for receivers. +//! Incoming requests must carry an `Authorization: Bearer ` header whose +//! token matches the configured value; requests without a valid header are +//! rejected with an [`AuthError`]. +//! +//! # Example YAML Configuration +//! +//! ```yaml +//! nodes: +//! bearer_auth: +//! type: "urn:otel:extension:bearer_token_auth" +//! config: +//! token: "my-secret-token" +//! +//! otlp-receiver: +//! type: "urn:otel:receiver:otlp" +//! config: +//! auth: +//! authenticator: "bearer_auth" +//! protocols: +//! grpc: +//! listening_addr: "0.0.0.0:4317" +//! ``` + +use async_trait::async_trait; +use linkme::distributed_slice; +use otap_df_config::node::NodeUserConfig; +use otap_df_engine::ExtensionFactory; +use otap_df_engine::config::ExtensionConfig; +use otap_df_engine::context::PipelineContext; +use otap_df_engine::control::ExtensionControlMsg; +use otap_df_engine::error::Error as EngineError; +use otap_df_engine::extension::ExtensionWrapper; +use otap_df_engine::extensions::ExtensionHandles; +use otap_df_engine::extensions::auth::{AuthError, ServerAuthenticator, ServerAuthenticatorHandle}; +use otap_df_engine::local::extension::{self as local, ControlChannel, EffectHandler}; +use otap_df_engine::node::NodeId; +use otap_df_telemetry::otel_info; +use serde::Deserialize; +use std::sync::Arc; + +use otap_df_otap::OTAP_EXTENSION_FACTORIES; + +/// URN identifying the Bearer Token Auth extension in configuration. +pub const BEARER_TOKEN_AUTH_URN: &str = "urn:otel:extension:bearer_token_auth"; + +/// Expected prefix in the Authorization header value. +const BEARER_PREFIX: &str = "Bearer "; + +// ─── Configuration ──────────────────────────────────────────────────────── + +/// Configuration for the Bearer Token Auth extension. +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct Config { + /// The static bearer token that incoming requests must present. + pub token: String, +} + +// ─── ServerAuthenticator implementation ─────────────────────────────────── + +/// A [`ServerAuthenticator`] that validates a static bearer token. +/// +/// Checks the `Authorization` header for a `Bearer ` value matching +/// the configured token. +struct BearerTokenServerAuth { + /// The expected token value (without the "Bearer " prefix). + expected_token: String, +} + +impl ServerAuthenticator for BearerTokenServerAuth { + fn authenticate(&self, headers: &http::HeaderMap) -> Result<(), AuthError> { + let auth_header = headers + .get(http::header::AUTHORIZATION) + .ok_or_else(|| AuthError { + message: "missing Authorization header".into(), + })?; + + let auth_value = auth_header.to_str().map_err(|_| AuthError { + message: "Authorization header contains invalid characters".into(), + })?; + + if !auth_value.starts_with(BEARER_PREFIX) { + return Err(AuthError { + message: "Authorization header is not a Bearer token".into(), + }); + } + + let provided_token = &auth_value[BEARER_PREFIX.len()..]; + if provided_token != self.expected_token { + return Err(AuthError { + message: "invalid bearer token".into(), + }); + } + + Ok(()) + } +} + +// ─── Extension implementation ───────────────────────────────────────────── + +/// The Bearer Token Auth extension. +/// +/// This extension has no background work — it simply awaits shutdown. +/// The authentication logic lives in [`BearerTokenServerAuth`] which is +/// registered as a [`ServerAuthenticatorHandle`] for receivers to consume. +struct BearerTokenAuthExtension; + +#[async_trait(?Send)] +impl local::Extension for BearerTokenAuthExtension { + async fn start( + self: Box, + mut ctrl_chan: ControlChannel, + _effect_handler: EffectHandler, + ) -> Result<(), EngineError> { + // No background work — just wait for shutdown. + loop { + match ctrl_chan.recv().await { + Ok(ExtensionControlMsg::Shutdown { .. }) => { + otel_info!("bearer_token_auth.shutdown"); + break; + } + Ok(_) => {} // ignore other control messages + Err(_) => break, + } + } + Ok(()) + } +} + +// ─── Factory registration ───────────────────────────────────────────────── + +/// Register the Bearer Token Auth extension with the OTAP pipeline factory. +#[allow(unsafe_code)] +#[distributed_slice(OTAP_EXTENSION_FACTORIES)] +pub static BEARER_TOKEN_AUTH_EXTENSION: ExtensionFactory = ExtensionFactory { + name: BEARER_TOKEN_AUTH_URN, + create: |_pipeline_ctx: PipelineContext, + node: NodeId, + node_config: Arc, + extension_config: &ExtensionConfig| { + // Deserialize user config JSON into typed Config + let cfg: Config = serde_json::from_value(node_config.config.clone()).map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { + error: e.to_string(), + } + })?; + + // Build the ServerAuthenticator handle for receivers + let auth = BearerTokenServerAuth { + expected_token: cfg.token, + }; + let mut handles = ExtensionHandles::new(); + handles.register(ServerAuthenticatorHandle::new(auth)); + + // Build the extension instance (no background state needed) + let extension = BearerTokenAuthExtension; + + Ok(ExtensionWrapper::local( + extension, + handles, + node, + node_config, + extension_config, + )) + }, + validate_config: otap_df_config::validation::validate_typed_config::, +}; + +#[cfg(test)] +mod tests { + use super::*; + use http::HeaderMap; + use http::HeaderValue; + use http::header::AUTHORIZATION; + + fn make_auth(token: &str) -> BearerTokenServerAuth { + BearerTokenServerAuth { + expected_token: token.to_string(), + } + } + + #[test] + fn valid_token_is_accepted() { + let auth = make_auth("secret-123"); + let mut headers = HeaderMap::new(); + _ = headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer secret-123")); + assert!(auth.authenticate(&headers).is_ok()); + } + + #[test] + fn missing_authorization_header_is_rejected() { + let auth = make_auth("secret-123"); + let headers = HeaderMap::new(); + let err = auth.authenticate(&headers).unwrap_err(); + assert!(err.message.contains("missing Authorization header")); + } + + #[test] + fn wrong_token_is_rejected() { + let auth = make_auth("correct-token"); + let mut headers = HeaderMap::new(); + _ = headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer wrong-token")); + let err = auth.authenticate(&headers).unwrap_err(); + assert!(err.message.contains("invalid bearer token")); + } + + #[test] + fn non_bearer_scheme_is_rejected() { + let auth = make_auth("secret"); + let mut headers = HeaderMap::new(); + _ = headers.insert(AUTHORIZATION, HeaderValue::from_static("Basic dXNlcjpwYXNz")); + let err = auth.authenticate(&headers).unwrap_err(); + assert!(err.message.contains("not a Bearer token")); + } + + #[test] + fn empty_token_in_header_is_rejected() { + let auth = make_auth("secret"); + let mut headers = HeaderMap::new(); + _ = headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer ")); + let err = auth.authenticate(&headers).unwrap_err(); + assert!(err.message.contains("invalid bearer token")); + } + + #[test] + fn handle_wraps_authenticator() { + let auth = make_auth("my-token"); + let handle = ServerAuthenticatorHandle::new(auth); + + let mut good = HeaderMap::new(); + _ = good.insert(AUTHORIZATION, HeaderValue::from_static("Bearer my-token")); + assert!(handle.authenticate(&good).is_ok()); + + let bad = HeaderMap::new(); + assert!(handle.authenticate(&bad).is_err()); + } + + #[test] + fn config_deserializes() { + let json = serde_json::json!({ "token": "abc-123" }); + let cfg: Config = serde_json::from_value(json).unwrap(); + assert_eq!(cfg.token, "abc-123"); + } + + #[test] + fn config_rejects_unknown_fields() { + let json = serde_json::json!({ "token": "abc", "extra": true }); + assert!(serde_json::from_value::(json).is_err()); + } +} diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/mod.rs new file mode 100644 index 0000000000..86244c4a13 --- /dev/null +++ b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/mod.rs @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Extension implementations for contrib nodes. + +/// Azure Identity authentication extension using `azure_identity` credentials. +#[cfg(feature = "azure-identity-auth-extension")] +pub mod azure_identity_auth; + +/// Bearer Token authentication extension for validating incoming requests. +#[cfg(feature = "bearer-token-auth-extension")] +pub mod bearer_token_auth; diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs b/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs index 2b5cb59c14..4053e501e1 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/lib.rs @@ -3,6 +3,9 @@ //! Implementation of the Contrib nodes (receiver, exporter, processor). +/// Extension implementations for contrib nodes. +pub mod extensions; + /// Exporter implementations for contrib nodes. pub mod exporters; diff --git a/rust/otap-dataflow/crates/engine/src/extensions/auth.rs b/rust/otap-dataflow/crates/engine/src/extensions/auth.rs index cdceecd111..79bbb901ab 100644 --- a/rust/otap-dataflow/crates/engine/src/extensions/auth.rs +++ b/rust/otap-dataflow/crates/engine/src/extensions/auth.rs @@ -49,9 +49,74 @@ //! } //! ``` +use serde::Deserialize; use std::fmt; use std::sync::{Arc, Mutex}; +// ─── Shared config structs ───────────────────────────────────────────────── + +/// Configuration for client-side authentication on exporter nodes. +/// +/// Exporters that need auth embed this struct in their config so that the +/// field name `auth.authenticator` is consistent across all exporters +/// +/// # YAML example +/// +/// ```yaml +/// azure-monitor-exporter: +/// type: "urn:microsoft:exporter:azure_monitor" +/// config: +/// auth: +/// authenticator: "azure_auth" # node name of the auth extension +/// ``` +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct ClientAuthConfig { + /// The node name of the extension that provides [`ClientAuthenticatorHandle`]. + pub authenticator: String, +} + +impl ClientAuthConfig { + /// Validates that the authenticator reference is non-empty. + pub fn validate(&self) -> Result<(), String> { + if self.authenticator.is_empty() { + return Err("auth.authenticator must be non-empty".to_string()); + } + Ok(()) + } +} + +/// Configuration for server-side authentication on receiver nodes. +/// +/// Receivers that need auth embed this struct in their config so that the +/// field name `auth.authenticator` is consistent across all receivers. +/// +/// # YAML example +/// +/// ```yaml +/// otlp-receiver: +/// type: "urn:otel:receiver:otlp" +/// config: +/// auth: +/// authenticator: "bearer_auth" # node name of the auth extension +/// ``` +#[derive(Debug, Deserialize, Clone)] +#[serde(deny_unknown_fields)] +pub struct ServerAuthConfig { + /// The node name of the extension that provides [`ServerAuthenticatorHandle`]. + pub authenticator: String, +} + +impl ServerAuthConfig { + /// Validates that the authenticator reference is non-empty. + pub fn validate(&self) -> Result<(), String> { + if self.authenticator.is_empty() { + return Err("auth.authenticator must be non-empty".to_string()); + } + Ok(()) + } +} + /// An error returned by authenticator operations. #[derive(Debug, Clone)] pub struct AuthError { diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware.rs index c2073b2584..f424e67548 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware.rs @@ -3,4 +3,5 @@ //! Middlewares for gRPC server +pub mod auth; pub mod zstd_header; diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs new file mode 100644 index 0000000000..14ecbff887 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! gRPC middleware for server-side authentication. +//! +//! This middleware integrates [`ServerAuthenticatorHandle`] with the tonic +//! gRPC server via the `tonic_middleware` crate. When an auth extension is +//! configured, incoming requests are validated before they reach the signal +//! handler; unauthenticated requests receive a gRPC `UNAUTHENTICATED` status. +//! When no auth is configured the middleware is a transparent pass-through. + +use async_trait::async_trait; +use http::{Request, Response}; +use otap_df_engine::extensions::auth::ServerAuthenticatorHandle; +use tonic::body::Body; +use tonic_middleware::{Middleware, ServiceBound}; + +/// A tonic middleware that optionally validates incoming requests using a +/// [`ServerAuthenticatorHandle`]. +/// +/// When `auth` is `Some`, every request is checked against the authenticator +/// and rejected with gRPC `UNAUTHENTICATED` on failure. When `auth` is +/// `None` the middleware is a no-op pass-through, which avoids type-level +/// branching in the server builder. +#[derive(Clone)] +pub struct AuthMiddleware { + auth: Option, +} + +impl AuthMiddleware { + /// Creates a new auth middleware. + /// + /// Pass `Some(handle)` to enforce authentication, or `None` for a + /// transparent pass-through. + pub fn new(auth: Option) -> Self { + Self { auth } + } +} + +#[async_trait] +impl Middleware for AuthMiddleware +where + S: ServiceBound, + S::Future: Send, +{ + async fn call( + &self, + req: Request, + mut service: S, + ) -> Result, S::Error> { + if let Some(auth) = &self.auth { + if let Err(e) = auth.authenticate(req.headers()) { + let status = tonic::Status::unauthenticated(e.message); + return Ok(status.into_http()); + } + } + + service.call(req).await + } +} diff --git a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs index 94bf138d7d..7615d47713 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs @@ -27,6 +27,7 @@ use crate::tls_utils::{build_tls_acceptor, create_tls_stream}; use crate::otap_grpc::common; use crate::otap_grpc::common::AckRegistry; +use crate::otap_grpc::middleware::auth::AuthMiddleware; use crate::otap_grpc::server_settings::GrpcServerSettings; use crate::otlp_http::HttpServerSettings; use crate::shared_concurrency::SharedConcurrencyLayer; @@ -120,6 +121,23 @@ const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1); #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] pub struct Config { + /// Optional server-side authentication. + /// + /// When configured, every incoming request must present valid credentials + /// matching the referenced auth extension (e.g., a bearer token). + /// + /// # Example + /// ```yaml + /// config: + /// auth: + /// authenticator: "bearer_auth" + /// protocols: + /// grpc: + /// listening_addr: "0.0.0.0:4317" + /// ``` + #[serde(default)] + pub auth: Option, + /// Protocol configurations. /// /// At least one protocol (gRPC or HTTP) must be configured. @@ -236,6 +254,13 @@ impl OTLPReceiver { }); } + // Validate auth config if present. + if let Some(auth) = &config.auth { + auth.validate().map_err(|e| { + otap_df_config::error::Error::InvalidUserConfig { error: e } + })?; + } + // Validate that gRPC and HTTP do not have conflicting listening addresses. // Conflicts occur when: // - Same port with either IP being unspecified (0.0.0.0 or ::), since unspecified binds all interfaces @@ -485,8 +510,32 @@ impl shared::Receiver for OTLPReceiver { mut self: Box, mut ctrl_msg_recv: shared::ControlChannel, effect_handler: shared::EffectHandler, - _extension_registry: ExtensionRegistry, + extension_registry: ExtensionRegistry, ) -> Result { + // Resolve the optional auth handle from the extension registry. + let auth_handle = if let Some(auth_config) = &self.config.auth { + let handle = extension_registry + .get::( + &auth_config.authenticator, + ) + .map_err(|e| Error::ReceiverError { + receiver: effect_handler.receiver_id(), + kind: ReceiverErrorKind::Configuration, + error: format!( + "failed to resolve auth extension '{}': {e}", + auth_config.authenticator + ), + source_detail: String::new(), + })?; + otap_df_telemetry::otel_info!( + "otlp.receiver.auth.enabled", + authenticator = %auth_config.authenticator + ); + Some(handle) + } else { + None + }; + let grpc_enabled = self.config.protocols.grpc.is_some(); let both_enabled = self.config.protocols.has_both(); @@ -590,7 +639,11 @@ impl shared::Receiver for OTLPReceiver { }; let mut server = - common::apply_server_tuning(Server::builder(), grpc_config).layer(limit_layer); + common::apply_server_tuning(Server::builder(), grpc_config) + .layer(limit_layer) + .layer(tonic_middleware::MiddlewareLayer::new(AuthMiddleware::new( + auth_handle.clone(), + ))); if let Some(timeout) = grpc_config.timeout { server = server.timeout(timeout); @@ -842,6 +895,7 @@ mod tests { ..Default::default() }; Config { + auth: None, protocols: Protocols { grpc: Some(grpc), http: None, @@ -857,6 +911,7 @@ mod tests { ..Default::default() }; Config { + auth: None, protocols: Protocols { grpc: None, http: Some(http), From 0146994d5167889d81a561c4d056a80149499968 Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Mon, 2 Mar 2026 22:36:17 +0000 Subject: [PATCH 2/3] Code changes --- .../src/extensions/azure_identity_auth/mod.rs | 78 ++++++++----------- 1 file changed, 34 insertions(+), 44 deletions(-) diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs index b0441baed8..29f4620626 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs @@ -12,9 +12,10 @@ //! # Token Refresh //! //! The extension's [`start`](Extension::start) loop proactively refreshes -//! the token before it expires. Consumers call +//! the token before it expires and broadcasts updates via a +//! [`tokio::sync::watch`] channel. Consumers call //! [`ClientAuthenticatorHandle::get_request_metadata`] to pull the latest -//! cached header — no push/update plumbing is required. +//! cached header — the watch channel ensures efficient, lock-free reads. //! //! # Example YAML Configuration //! @@ -48,7 +49,8 @@ use otap_df_engine::node::NodeId; use otap_df_telemetry::{otel_debug, otel_error, otel_info, otel_warn}; use http::header::HeaderValue; use serde::Deserialize; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +use tokio::sync::watch; use otap_df_otap::OTAP_EXTENSION_FACTORIES; @@ -111,22 +113,19 @@ fn default_scope() -> String { /// A [`ClientAuthenticator`] backed by a cached Azure AD bearer token. /// -/// The extension's background loop refreshes the token and stores it in the -/// shared `Arc>`. Consumers (exporters, heartbeat clients, etc.) -/// read the latest cached header on every call to [`get_request_metadata`]. +/// The extension's background loop refreshes the token and broadcasts it via +/// a [`tokio::sync::watch`] channel. Consumers (exporters, heartbeat +/// clients, etc.) read the latest cached header on every call to +/// [`get_request_metadata`] using the watch receiver's lock-free borrow. struct AzureIdentityClientAuth { - cached_header: Arc>>, + token_rx: watch::Receiver>, } impl ClientAuthenticator for AzureIdentityClientAuth { fn get_request_metadata( &self, ) -> Result, AuthError> { - let header = self - .cached_header - .lock() - .expect("cached_header lock poisoned") - .clone(); + let header = self.token_rx.borrow().clone(); match header { Some(h) => Ok(vec![(http::header::AUTHORIZATION, h)]), None => Err(AuthError { @@ -141,12 +140,13 @@ impl ClientAuthenticator for AzureIdentityClientAuth { /// The Azure Identity Auth extension. /// /// Runs a background loop that proactively refreshes an Azure AD access token -/// and stores the formatted `Authorization: Bearer ` header value for -/// consumers to pull via [`ClientAuthenticatorHandle::get_request_metadata`]. +/// and broadcasts the formatted `Authorization: Bearer ` header value +/// via a [`tokio::sync::watch`] channel for consumers to pull via +/// [`ClientAuthenticatorHandle::get_request_metadata`]. struct AzureIdentityAuthExtension { credential: Arc, scope: String, - cached_header: Arc>>, + token_tx: watch::Sender>, } impl AzureIdentityAuthExtension { @@ -261,10 +261,7 @@ impl local::Extension for AzureIdentityAuthExtension { &format!("Bearer {}", access_token.token.secret()), ) { Ok(header) => { - *self - .cached_header - .lock() - .expect("cached_header lock poisoned") = Some(header); + let _ = self.token_tx.send_replace(Some(header)); next_token_refresh = get_next_token_refresh(&access_token); @@ -382,13 +379,12 @@ pub static AZURE_IDENTITY_AUTH_EXTENSION: ExtensionFactory = ExtensionFactory { // Create the Azure credential let credential = create_credential(&cfg)?; - // Shared state: the cached Authorization header - let cached_header: Arc>> = Arc::new(Mutex::new(None)); + // Create watch channel: extension broadcasts token updates, + // authenticator reads the latest value via the receiver. + let (token_tx, token_rx) = watch::channel(None); // Build the ClientAuthenticator handle for consumers - let auth = AzureIdentityClientAuth { - cached_header: cached_header.clone(), - }; + let auth = AzureIdentityClientAuth { token_rx }; let mut handles = ExtensionHandles::new(); handles.register(ClientAuthenticatorHandle::new(auth)); @@ -396,7 +392,7 @@ pub static AZURE_IDENTITY_AUTH_EXTENSION: ExtensionFactory = ExtensionFactory { let extension = AzureIdentityAuthExtension { credential, scope: cfg.scope, - cached_header, + token_tx, }; Ok(ExtensionWrapper::local( @@ -462,10 +458,8 @@ mod tests { #[test] fn test_client_auth_returns_none_before_refresh() { - let cached = Arc::new(Mutex::new(None)); - let auth = AzureIdentityClientAuth { - cached_header: cached, - }; + let (_tx, rx) = watch::channel(None); + let auth = AzureIdentityClientAuth { token_rx: rx }; let err = auth.get_request_metadata().unwrap_err(); assert!(err.message.contains("not yet available")); @@ -473,12 +467,10 @@ mod tests { #[test] fn test_client_auth_returns_cached_header() { - let cached = Arc::new(Mutex::new(Some( + let (_tx, rx) = watch::channel(Some( HeaderValue::from_static("Bearer test-token"), - ))); - let auth = AzureIdentityClientAuth { - cached_header: cached, - }; + )); + let auth = AzureIdentityClientAuth { token_rx: rx }; let metadata = auth.get_request_metadata().unwrap(); assert_eq!(metadata.len(), 1); @@ -488,18 +480,16 @@ mod tests { #[test] fn test_client_auth_sees_updates() { - let cached: Arc>> = Arc::new(Mutex::new(None)); - let auth = AzureIdentityClientAuth { - cached_header: cached.clone(), - }; + let (tx, rx) = watch::channel(None); + let auth = AzureIdentityClientAuth { token_rx: rx }; assert!(auth.get_request_metadata().is_err()); - *cached.lock().unwrap() = Some(HeaderValue::from_static("Bearer v1")); + let _ = tx.send_replace(Some(HeaderValue::from_static("Bearer v1"))); let meta = auth.get_request_metadata().unwrap(); assert_eq!(meta[0].1, "Bearer v1"); - *cached.lock().unwrap() = Some(HeaderValue::from_static("Bearer v2")); + let _ = tx.send_replace(Some(HeaderValue::from_static("Bearer v2"))); let meta = auth.get_request_metadata().unwrap(); assert_eq!(meta[0].1, "Bearer v2"); } @@ -552,12 +542,12 @@ mod tests { let (credential, call_count) = mock_credential("my-azure-token", azure_core::time::Duration::seconds(3600)); - let cached_header: Arc>> = Arc::new(Mutex::new(None)); + let (token_tx, token_rx) = watch::channel(None); let extension = AzureIdentityAuthExtension { credential, scope: "https://monitor.azure.com/.default".to_string(), - cached_header: cached_header.clone(), + token_tx, }; let config = ExtensionConfig::new("test_azure_auth"); @@ -586,8 +576,8 @@ mod tests { // Verify token was acquired assert!(call_count.load(Ordering::SeqCst) >= 1); - // Verify cached header is set - let header = cached_header.lock().unwrap().clone(); + // Verify token is broadcast via watch channel + let header = token_rx.borrow().clone(); assert!(header.is_some()); assert_eq!(header.unwrap(), "Bearer my-azure-token"); From 5092beea32966f271b5ed96da5bb0df0657a152a Mon Sep 17 00:00:00 2001 From: Utkarsh Umesan Pillai <66651184+utpilla@users.noreply.github.com> Date: Thu, 12 Mar 2026 00:14:26 +0000 Subject: [PATCH 3/3] Add BearerToken Extension --- .../azure_monitor_exporter/client.rs | 10 +- .../azure_monitor_exporter/config.rs | 42 ++- .../azure_monitor_exporter/exporter.rs | 11 +- .../azure_monitor_exporter/heartbeat.rs | 5 +- .../in_flight_exports.rs | 9 +- .../azure_monitor_exporter/transformer.rs | 9 +- .../src/extensions/azure_identity_auth/mod.rs | 253 ++++++------- .../src/extensions/bearer_token_auth/mod.rs | 10 +- .../engine/src/extensions/bearer_token.rs | 331 ++++++++++++++++++ .../crates/engine/src/extensions/mod.rs | 4 + .../otap/src/otap_grpc/middleware/auth.rs | 6 +- .../crates/otap/src/otlp_receiver.rs | 16 +- 12 files changed, 539 insertions(+), 167 deletions(-) create mode 100644 rust/otap-dataflow/crates/engine/src/extensions/bearer_token.rs diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs index 551960c873..bb61bcaeeb 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/client.rs @@ -379,9 +379,13 @@ mod tests { let http_client = create_test_http_client(); - let client = - LogsIngestionClient::new(&api_config, http_client, create_test_auth(), create_test_metrics()) - .unwrap(); + let client = LogsIngestionClient::new( + &api_config, + http_client, + create_test_auth(), + create_test_metrics(), + ) + .unwrap(); assert!(client.endpoint.contains("dcr-abc-123-def")); assert!(client.endpoint.contains("Custom-Stream_Name")); diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs index ec89f84e9f..e095251f0a 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/config.rs @@ -1,8 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -use otap_df_engine::extensions::auth::ClientAuthConfig; use super::Error; +use otap_df_engine::extensions::auth::ClientAuthConfig; use serde::Deserialize; use serde_json::Value; use std::collections::{HashMap, HashSet}; @@ -58,9 +58,9 @@ impl Config { /// Validate the configuration pub fn validate(&self) -> Result<(), Error> { // Validate auth extension reference - self.auth.validate().map_err(|e| { - Error::Config(format!("Invalid configuration: {e}")) - })?; + self.auth + .validate() + .map_err(|e| Error::Config(format!("Invalid configuration: {e}")))?; // Validate API configuration if self.api.dcr_endpoint.is_empty() { @@ -150,7 +150,9 @@ mod tests { dcr: "mydcr".to_string(), schema: SchemaConfig::default(), }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, }; assert!(config.validate().is_ok()); @@ -165,7 +167,9 @@ mod tests { dcr: "".to_string(), schema: SchemaConfig::default(), }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, }; let result = config.validate(); @@ -185,15 +189,19 @@ mod tests { dcr: "mydcr".to_string(), schema: SchemaConfig::default(), }, - auth: ClientAuthConfig { authenticator: "".to_string() }, + auth: ClientAuthConfig { + authenticator: "".to_string(), + }, }; let result = config.validate(); assert!(result.is_err()); - assert!(result - .unwrap_err() - .to_string() - .contains("auth.authenticator must be non-empty")); + assert!( + result + .unwrap_err() + .to_string() + .contains("auth.authenticator must be non-empty") + ); } #[test] @@ -213,7 +221,9 @@ mod tests { ]), }, }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, }; let result = config.validate(); @@ -253,7 +263,9 @@ mod tests { ]), }, }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, }; let result = config.validate(); @@ -282,7 +294,9 @@ mod tests { )]), }, }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, }; let result = config.validate(); diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs index b318195a5b..4f20f811a9 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/exporter.rs @@ -463,11 +463,10 @@ impl Exporter for AzureMonitorExporter { })?; // Create heartbeat handler (needs auth handle) - let mut heartbeat = Heartbeat::new(&self.config.api, auth).map_err(|e| { - EngineError::InternalError { + let mut heartbeat = + Heartbeat::new(&self.config.api, auth).map_err(|e| EngineError::InternalError { message: format!("Failed to create heartbeat handler: {e}"), - } - })?; + })?; // Start periodic telemetry collection and retain the cancel handle for graceful shutdown let telemetry_timer_cancel_handle = effect_handler @@ -596,7 +595,9 @@ mod tests { log_record_mapping: HashMap::new(), }, }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, } } diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs index e250d42095..69baff59d3 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/heartbeat.rs @@ -8,10 +8,7 @@ use super::config::ApiConfig; use super::error::Error; use chrono::Utc; use otap_df_engine::extensions::auth::ClientAuthenticatorHandle; -use reqwest::{ - Client, - header::CONTENT_TYPE, -}; +use reqwest::{Client, header::CONTENT_TYPE}; use std::time::Duration; use sysinfo::System; diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs index 44c3847e00..28646d3d98 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/in_flight_exports.rs @@ -126,14 +126,19 @@ mod tests { } fn create_test_client() -> LogsIngestionClient { - use otap_df_engine::extensions::auth::{AuthError, ClientAuthenticator, ClientAuthenticatorHandle}; + use otap_df_engine::extensions::auth::{ + AuthError, ClientAuthenticator, ClientAuthenticatorHandle, + }; struct TestAuth; impl ClientAuthenticator for TestAuth { fn get_request_metadata( &self, ) -> Result, AuthError> { - Ok(vec![(http::header::AUTHORIZATION, http::HeaderValue::from_static("Bearer test"))]) + Ok(vec![( + http::header::AUTHORIZATION, + http::HeaderValue::from_static("Bearer test"), + )]) } } diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs index 6570530383..1545ced4c0 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/exporters/azure_monitor_exporter/transformer.rs @@ -432,7 +432,9 @@ mod tests { ]), }, }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, } } @@ -734,12 +736,13 @@ mod tests { log_record_mapping: HashMap::new(), }, }, - auth: ClientAuthConfig { authenticator: "azure_auth".to_string() }, + auth: ClientAuthConfig { + authenticator: "azure_auth".to_string(), + }, }; let transformer = Transformer::new(&config, create_test_metrics()); - let request = ExportLogsServiceRequest { resource_logs: vec![ResourceLogs { resource: Some(Resource { diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs index 29f4620626..54875f3116 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/azure_identity_auth/mod.rs @@ -5,17 +5,17 @@ //! //! This extension acquires Azure AD tokens using `azure_identity` credentials //! (Managed Identity or Developer Tools) and exposes them as a -//! [`ClientAuthenticatorHandle`] so that exporters and other pipeline -//! components can attach `Authorization: Bearer ` headers without -//! managing their own token lifecycle. +//! [`BearerTokenProviderHandle`] so that exporters and other pipeline +//! components can obtain bearer tokens without managing their own token +//! lifecycle. //! //! # Token Refresh //! //! The extension's [`start`](Extension::start) loop proactively refreshes //! the token before it expires and broadcasts updates via a //! [`tokio::sync::watch`] channel. Consumers call -//! [`ClientAuthenticatorHandle::get_request_metadata`] to pull the latest -//! cached header — the watch channel ensures efficient, lock-free reads. +//! [`BearerTokenProviderHandle::get_token`] to pull the latest cached +//! token — the watch channel ensures efficient, lock-free reads. //! //! # Example YAML Configuration //! @@ -42,12 +42,13 @@ use otap_df_engine::context::PipelineContext; use otap_df_engine::control::ExtensionControlMsg; use otap_df_engine::error::Error as EngineError; use otap_df_engine::extension::ExtensionWrapper; -use otap_df_engine::extensions::auth::{AuthError, ClientAuthenticator, ClientAuthenticatorHandle}; use otap_df_engine::extensions::ExtensionHandles; +use otap_df_engine::extensions::bearer_token::{ + BearerToken, BearerTokenError, BearerTokenProvider, BearerTokenProviderHandle, +}; use otap_df_engine::local::extension::{self as local, ControlChannel, EffectHandler}; use otap_df_engine::node::NodeId; use otap_df_telemetry::{otel_debug, otel_error, otel_info, otel_warn}; -use http::header::HeaderValue; use serde::Deserialize; use std::sync::Arc; use tokio::sync::watch; @@ -109,29 +110,25 @@ fn default_scope() -> String { "https://monitor.azure.com/.default".to_string() } -// ─── ClientAuthenticator implementation ─────────────────────────────────── +// ─── BearerTokenProvider implementation ──────────────────────────────────── -/// A [`ClientAuthenticator`] backed by a cached Azure AD bearer token. +/// A [`BearerTokenProvider`] backed by a cached Azure AD bearer token. /// -/// The extension's background loop refreshes the token and broadcasts it via -/// a [`tokio::sync::watch`] channel. Consumers (exporters, heartbeat -/// clients, etc.) read the latest cached header on every call to -/// [`get_request_metadata`] using the watch receiver's lock-free borrow. -struct AzureIdentityClientAuth { - token_rx: watch::Receiver>, +/// Reads from a `watch` channel. The extension's background loop refreshes +/// the token and broadcasts `BearerToken` values. +struct AzureIdentityTokenProvider { + token_rx: watch::Receiver>, } -impl ClientAuthenticator for AzureIdentityClientAuth { - fn get_request_metadata( - &self, - ) -> Result, AuthError> { - let header = self.token_rx.borrow().clone(); - match header { - Some(h) => Ok(vec![(http::header::AUTHORIZATION, h)]), - None => Err(AuthError { - message: "Azure AD token not yet available".into(), - }), - } +impl BearerTokenProvider for AzureIdentityTokenProvider { + fn get_token(&self) -> Result { + self.token_rx.borrow().clone().ok_or(BearerTokenError { + message: "Azure AD token not yet available".into(), + }) + } + + fn subscribe_token_refresh(&self) -> watch::Receiver> { + self.token_rx.clone() } } @@ -140,13 +137,12 @@ impl ClientAuthenticator for AzureIdentityClientAuth { /// The Azure Identity Auth extension. /// /// Runs a background loop that proactively refreshes an Azure AD access token -/// and broadcasts the formatted `Authorization: Bearer ` header value -/// via a [`tokio::sync::watch`] channel for consumers to pull via -/// [`ClientAuthenticatorHandle::get_request_metadata`]. +/// and broadcasts [`BearerToken`] values via a [`tokio::sync::watch`] channel +/// for consumers to pull via [`BearerTokenProviderHandle::get_token`]. struct AzureIdentityAuthExtension { credential: Arc, scope: String, - token_tx: watch::Sender>, + bearer_token_tx: watch::Sender>, } impl AzureIdentityAuthExtension { @@ -257,38 +253,27 @@ impl local::Extension for AzureIdentityAuthExtension { _ = tokio::time::sleep_until(next_token_refresh) => { match self.get_token().await { Ok(access_token) => { - match HeaderValue::from_str( - &format!("Bearer {}", access_token.token.secret()), - ) { - Ok(header) => { - let _ = self.token_tx.send_replace(Some(header)); - - next_token_refresh = get_next_token_refresh(&access_token); - - let refresh_in = next_token_refresh - .saturating_duration_since(tokio::time::Instant::now()); - let total_secs = refresh_in.as_secs(); - let hours = total_secs / 3600; - let minutes = (total_secs % 3600) / 60; - let seconds = total_secs % 60; - - otel_info!( - "azure_identity_auth.token_refresh", - refresh_in = - format!("{}h {}m {}s", hours, minutes, seconds) - ); - } - Err(e) => { - otel_error!( - "azure_identity_auth.header_creation_failed", - error = ?e - ); - next_token_refresh = tokio::time::Instant::now() - + tokio::time::Duration::from_secs( - MIN_TOKEN_REFRESH_INTERVAL_SECS, - ); - } - } + let _ = self.bearer_token_tx.send_replace(Some( + BearerToken::new( + access_token.token.secret().to_string(), + access_token.expires_on.unix_timestamp(), + ), + )); + + next_token_refresh = get_next_token_refresh(&access_token); + + let refresh_in = next_token_refresh + .saturating_duration_since(tokio::time::Instant::now()); + let total_secs = refresh_in.as_secs(); + let hours = total_secs / 3600; + let minutes = (total_secs % 3600) / 60; + let seconds = total_secs % 60; + + otel_info!( + "azure_identity_auth.token_refresh", + refresh_in = + format!("{}h {}m {}s", hours, minutes, seconds) + ); } Err(e) => { otel_error!( @@ -331,22 +316,22 @@ fn create_credential( ); } - ManagedIdentityCredential::new(Some(options)).map(|c| c as Arc).map_err(|e| { - otap_df_config::error::Error::InvalidUserConfig { + ManagedIdentityCredential::new(Some(options)) + .map(|c| c as Arc) + .map_err(|e| otap_df_config::error::Error::InvalidUserConfig { error: format!("Failed to create managed identity credential: {e}"), - } - }) + }) } AuthMethod::Development => { otel_info!( "azure_identity_auth.credential_type", method = "developer_tools" ); - DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions::default())).map(|c| c as Arc).map_err( - |e| otap_df_config::error::Error::InvalidUserConfig { + DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions::default())) + .map(|c| c as Arc) + .map_err(|e| otap_df_config::error::Error::InvalidUserConfig { error: format!("Failed to create developer tools credential: {e}"), - }, - ) + }) } } } @@ -379,20 +364,21 @@ pub static AZURE_IDENTITY_AUTH_EXTENSION: ExtensionFactory = ExtensionFactory { // Create the Azure credential let credential = create_credential(&cfg)?; - // Create watch channel: extension broadcasts token updates, - // authenticator reads the latest value via the receiver. - let (token_tx, token_rx) = watch::channel(None); + // Create watch channel for broadcasting BearerToken updates + let (bearer_token_tx, bearer_token_rx) = watch::channel(None); - // Build the ClientAuthenticator handle for consumers - let auth = AzureIdentityClientAuth { token_rx }; + // Build the BearerTokenProvider handle + let token_provider = AzureIdentityTokenProvider { + token_rx: bearer_token_rx, + }; let mut handles = ExtensionHandles::new(); - handles.register(ClientAuthenticatorHandle::new(auth)); + handles.register(BearerTokenProviderHandle::new(token_provider)); // Build the extension instance let extension = AzureIdentityAuthExtension { credential, scope: cfg.scope, - token_tx, + bearer_token_tx, }; Ok(ExtensionWrapper::local( @@ -456,44 +442,6 @@ mod tests { ); } - #[test] - fn test_client_auth_returns_none_before_refresh() { - let (_tx, rx) = watch::channel(None); - let auth = AzureIdentityClientAuth { token_rx: rx }; - - let err = auth.get_request_metadata().unwrap_err(); - assert!(err.message.contains("not yet available")); - } - - #[test] - fn test_client_auth_returns_cached_header() { - let (_tx, rx) = watch::channel(Some( - HeaderValue::from_static("Bearer test-token"), - )); - let auth = AzureIdentityClientAuth { token_rx: rx }; - - let metadata = auth.get_request_metadata().unwrap(); - assert_eq!(metadata.len(), 1); - assert_eq!(metadata[0].0, http::header::AUTHORIZATION); - assert_eq!(metadata[0].1, "Bearer test-token"); - } - - #[test] - fn test_client_auth_sees_updates() { - let (tx, rx) = watch::channel(None); - let auth = AzureIdentityClientAuth { token_rx: rx }; - - assert!(auth.get_request_metadata().is_err()); - - let _ = tx.send_replace(Some(HeaderValue::from_static("Bearer v1"))); - let meta = auth.get_request_metadata().unwrap(); - assert_eq!(meta[0].1, "Bearer v1"); - - let _ = tx.send_replace(Some(HeaderValue::from_static("Bearer v2"))); - let meta = auth.get_request_metadata().unwrap(); - assert_eq!(meta[0].1, "Bearer v2"); - } - #[test] fn test_get_next_token_refresh_far_future() { let now = OffsetDateTime::now_utc(); @@ -542,12 +490,12 @@ mod tests { let (credential, call_count) = mock_credential("my-azure-token", azure_core::time::Duration::seconds(3600)); - let (token_tx, token_rx) = watch::channel(None); + let (bearer_token_tx, bearer_token_rx) = watch::channel(None); let extension = AzureIdentityAuthExtension { credential, scope: "https://monitor.azure.com/.default".to_string(), - token_tx, + bearer_token_tx, }; let config = ExtensionConfig::new("test_azure_auth"); @@ -577,9 +525,9 @@ mod tests { assert!(call_count.load(Ordering::SeqCst) >= 1); // Verify token is broadcast via watch channel - let header = token_rx.borrow().clone(); - assert!(header.is_some()); - assert_eq!(header.unwrap(), "Bearer my-azure-token"); + let token = bearer_token_rx.borrow().clone(); + assert!(token.is_some()); + assert_eq!(token.unwrap().token.secret(), "my-azure-token"); // Shutdown sender @@ -631,4 +579,69 @@ mod tests { assert_eq!(cfg.method, AuthMethod::Development); } } + + // ─── BearerTokenProvider tests ──────────────────────────────────────── + + #[test] + fn test_token_provider_returns_error_before_refresh() { + let (_tx, rx) = watch::channel(None); + let provider = AzureIdentityTokenProvider { token_rx: rx }; + + let err = provider.get_token().unwrap_err(); + assert!(err.message.contains("not yet available")); + } + + #[test] + fn test_token_provider_returns_cached_token() { + let token = BearerToken::new("azure-token", 1_700_000_000); + let (_tx, rx) = watch::channel(Some(token)); + let provider = AzureIdentityTokenProvider { token_rx: rx }; + + let result = provider.get_token().unwrap(); + assert_eq!(result.token.secret(), "azure-token"); + assert_eq!(result.expires_on, 1_700_000_000); + } + + #[test] + fn test_token_provider_sees_updates() { + let (tx, rx) = watch::channel(None); + let provider = AzureIdentityTokenProvider { token_rx: rx }; + + assert!(provider.get_token().is_err()); + + let _ = tx.send(Some(BearerToken::new("v1", 100))); + let t = provider.get_token().unwrap(); + assert_eq!(t.token.secret(), "v1"); + + let _ = tx.send(Some(BearerToken::new("v2", 200))); + let t = provider.get_token().unwrap(); + assert_eq!(t.token.secret(), "v2"); + } + + #[tokio::test] + async fn test_token_provider_subscribe_receives_updates() { + let (tx, rx) = watch::channel(None); + let provider = AzureIdentityTokenProvider { token_rx: rx }; + + let mut sub = provider.subscribe_token_refresh(); + let _ = tx.send(Some(BearerToken::new("refreshed", 300))); + sub.changed().await.unwrap(); + + let token = sub.borrow().clone().unwrap(); + assert_eq!(token.token.secret(), "refreshed"); + assert_eq!(token.expires_on, 300); + } + + #[test] + fn test_token_provider_handle_wraps_correctly() { + let (tx, rx) = watch::channel(None); + let provider = AzureIdentityTokenProvider { token_rx: rx }; + let handle = BearerTokenProviderHandle::new(provider); + + assert!(handle.get_token().is_err()); + + let _ = tx.send(Some(BearerToken::new("wrapped", 42))); + let token = handle.get_token().unwrap(); + assert_eq!(token.token.secret(), "wrapped"); + } } diff --git a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs index 2259012d72..c73f574116 100644 --- a/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs +++ b/rust/otap-dataflow/crates/contrib-nodes/src/extensions/bearer_token_auth/mod.rs @@ -205,7 +205,10 @@ mod tests { fn wrong_token_is_rejected() { let auth = make_auth("correct-token"); let mut headers = HeaderMap::new(); - _ = headers.insert(AUTHORIZATION, HeaderValue::from_static("Bearer wrong-token")); + _ = headers.insert( + AUTHORIZATION, + HeaderValue::from_static("Bearer wrong-token"), + ); let err = auth.authenticate(&headers).unwrap_err(); assert!(err.message.contains("invalid bearer token")); } @@ -214,7 +217,10 @@ mod tests { fn non_bearer_scheme_is_rejected() { let auth = make_auth("secret"); let mut headers = HeaderMap::new(); - _ = headers.insert(AUTHORIZATION, HeaderValue::from_static("Basic dXNlcjpwYXNz")); + _ = headers.insert( + AUTHORIZATION, + HeaderValue::from_static("Basic dXNlcjpwYXNz"), + ); let err = auth.authenticate(&headers).unwrap_err(); assert!(err.message.contains("not a Bearer token")); } diff --git a/rust/otap-dataflow/crates/engine/src/extensions/bearer_token.rs b/rust/otap-dataflow/crates/engine/src/extensions/bearer_token.rs new file mode 100644 index 0000000000..f6bab182a2 --- /dev/null +++ b/rust/otap-dataflow/crates/engine/src/extensions/bearer_token.rs @@ -0,0 +1,331 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Bearer token provider handle for extensions. +//! +//! This module defines a token provider contract for extensions that manage +//! bearer authentication tokens (e.g., Azure Managed Identity, OAuth2 flows): +//! +//! - [`BearerTokenProvider`] — trait for components that read cached tokens. +//! - [`BearerTokenProviderHandle`] — a cloneable handle that consumers use to +//! obtain tokens and subscribe to refresh events. +//! +//! # Design +//! +//! Handle methods are **sync** by design. The extension's background task +//! (`start()`) handles all async I/O — token acquisition, retries, refresh +//! scheduling — and broadcasts updates via a `watch` channel. Consumers only +//! read cached state through the handle; they never perform I/O. +//! +//! This matches the Go Collector's auth extension interfaces, where +//! `Server.Authenticate`, `HTTPClient.RoundTripper`, and +//! `GRPCClient.PerRPCCredentials` are all sync. +//! +//! # Examples +//! +//! ## Extension factory — registering the handle +//! +//! ```rust,ignore +//! let (token_tx, token_rx) = watch::channel(None); +//! +//! let provider = MyTokenProvider { token_rx }; +//! let mut handles = ExtensionHandles::new(); +//! handles.register(BearerTokenProviderHandle::new(provider)); +//! ``` +//! +//! ## Exporter — obtaining a cached token +//! +//! ```rust,ignore +//! let handle = extension_registry +//! .get::("my_auth")?; +//! +//! let token = handle.get_token()?; +//! request.headers_mut().insert( +//! http::header::AUTHORIZATION, +//! format!("Bearer {}", token.token.secret()).parse().unwrap(), +//! ); +//! ``` +//! +//! ## Subscribing to token refresh events +//! +//! ```rust,ignore +//! let handle = extension_registry +//! .get::("my_auth")?; +//! +//! let mut token_rx = handle.subscribe_token_refresh(); +//! loop { +//! tokio::select! { +//! _ = token_rx.changed() => { +//! if let Some(token) = token_rx.borrow().as_ref() { +//! // Update headers, etc. +//! } +//! } +//! } +//! } +//! ``` + +use std::borrow::Cow; +use std::fmt; +use std::sync::{Arc, Mutex}; + +// ─── Secret ──────────────────────────────────────────────────────────────── + +/// Represents a secret value that should not be exposed in logs or debug output. +/// +/// The [`Debug`] implementation redacts the actual value. +#[derive(Clone, Eq)] +pub struct Secret(Cow<'static, str>); + +impl Secret { + /// Creates a new `Secret`. + #[must_use] + pub fn new(value: T) -> Self + where + T: Into>, + { + Self(value.into()) + } + + /// Returns the secret value. + #[must_use] + pub fn secret(&self) -> &str { + &self.0 + } +} + +impl PartialEq for Secret { + fn eq(&self, other: &Self) -> bool { + self.secret() == other.secret() + } +} + +impl From for Secret { + fn from(value: String) -> Self { + Self::new(value) + } +} + +impl From<&'static str> for Secret { + fn from(value: &'static str) -> Self { + Self::new(value) + } +} + +impl fmt::Debug for Secret { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("Secret") + } +} + +// ─── BearerToken ─────────────────────────────────────────────────────────── + +/// Represents a bearer token with its expiration time. +/// +/// The token value is wrapped in [`Secret`] to prevent accidental exposure +/// in logs or debug output. +#[derive(Debug, Clone)] +pub struct BearerToken { + /// The token value. + pub token: Secret, + /// The expiration time as a UNIX timestamp (seconds since epoch). + pub expires_on: i64, +} + +impl BearerToken { + /// Creates a new bearer token. + #[must_use] + pub fn new(token: T, expires_on: i64) -> Self + where + T: Into, + { + Self { + token: token.into(), + expires_on, + } + } +} + +// ─── Error ───────────────────────────────────────────────────────────────── + +/// An error returned by bearer token provider operations. +#[derive(Debug, Clone)] +pub struct BearerTokenError { + /// A human-readable description of the failure. + pub message: String, +} + +impl fmt::Display for BearerTokenError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "bearer token error: {}", self.message) + } +} + +impl std::error::Error for BearerTokenError {} + +// ─── Trait ───────────────────────────────────────────────────────────────── + +/// A trait for reading cached bearer authentication tokens. +/// +/// Handle methods are **sync** — the extension's background task handles +/// all async I/O (token acquisition, retries, refresh scheduling) and +/// broadcasts updates via a `watch` channel. Implementations read from +/// that channel; they never perform I/O. +pub trait BearerTokenProvider: Send { + /// Returns the latest cached token. + /// + /// # Errors + /// + /// Returns a [`BearerTokenError`] if no token is available yet + /// (e.g., the extension hasn't completed its first refresh). + fn get_token(&self) -> Result; + + /// Returns a receiver for token refresh notifications. + /// + /// Each call creates an independent subscription. The receiver always + /// contains the latest token value (or `None` if no token has been + /// acquired yet). + fn subscribe_token_refresh(&self) -> tokio::sync::watch::Receiver>; +} + +// ─── Handle ──────────────────────────────────────────────────────────────── + +/// A cloneable handle that consumers use to obtain bearer tokens. +/// +/// Wraps any [`BearerTokenProvider`] behind an `Arc>`. The `Mutex` +/// makes the handle `Sync` (required by tonic services) without requiring +/// `Sync` on the trait itself. The lock is never contended — methods are +/// sync and complete in nanoseconds (reading from a `watch::Receiver`). +#[derive(Clone)] +pub struct BearerTokenProviderHandle { + inner: Arc>>, +} + +impl BearerTokenProviderHandle { + /// Creates a new handle wrapping the given provider implementation. + pub fn new(provider: impl BearerTokenProvider + 'static) -> Self { + Self { + inner: Arc::new(Mutex::new(Box::new(provider))), + } + } + + /// Returns the latest cached token. + /// + /// Delegates to the underlying [`BearerTokenProvider`] implementation. + pub fn get_token(&self) -> Result { + self.inner + .lock() + .expect("BearerTokenProvider lock poisoned") + .get_token() + } + + /// Returns a receiver for token refresh notifications. + /// + /// Delegates to the underlying [`BearerTokenProvider`] implementation. + pub fn subscribe_token_refresh(&self) -> tokio::sync::watch::Receiver> { + self.inner + .lock() + .expect("BearerTokenProvider lock poisoned") + .subscribe_token_refresh() + } +} + +impl fmt::Debug for BearerTokenProviderHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BearerTokenProviderHandle").finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::sync::watch; + + /// A trivial in-memory token provider backed by a watch channel. + struct WatchTokenProvider { + token_rx: watch::Receiver>, + } + + impl BearerTokenProvider for WatchTokenProvider { + fn get_token(&self) -> Result { + self.token_rx.borrow().clone().ok_or(BearerTokenError { + message: "token not yet available".into(), + }) + } + + fn subscribe_token_refresh(&self) -> watch::Receiver> { + self.token_rx.clone() + } + } + + #[test] + fn get_token_returns_error_when_empty() { + let (_tx, rx) = watch::channel(None); + let handle = BearerTokenProviderHandle::new(WatchTokenProvider { token_rx: rx }); + + let err = handle.get_token().unwrap_err(); + assert!(err.message.contains("not yet available")); + } + + #[test] + fn get_token_returns_cached_value() { + let (_tx, rx) = watch::channel(Some(BearerToken::new("my-token", 1_700_000_000))); + let handle = BearerTokenProviderHandle::new(WatchTokenProvider { token_rx: rx }); + + let token = handle.get_token().unwrap(); + assert_eq!(token.token.secret(), "my-token"); + assert_eq!(token.expires_on, 1_700_000_000); + } + + #[test] + fn get_token_sees_updates() { + let (tx, rx) = watch::channel(None); + let handle = BearerTokenProviderHandle::new(WatchTokenProvider { token_rx: rx }); + + assert!(handle.get_token().is_err()); + + let _ = tx.send(Some(BearerToken::new("v1", 100))); + let token = handle.get_token().unwrap(); + assert_eq!(token.token.secret(), "v1"); + + let _ = tx.send(Some(BearerToken::new("v2", 200))); + let token = handle.get_token().unwrap(); + assert_eq!(token.token.secret(), "v2"); + } + + #[tokio::test] + async fn subscribe_receives_updates() { + let (tx, rx) = watch::channel(None); + let handle = BearerTokenProviderHandle::new(WatchTokenProvider { token_rx: rx }); + + let mut sub_rx = handle.subscribe_token_refresh(); + + let _ = tx.send(Some(BearerToken::new("refreshed", 200))); + sub_rx.changed().await.unwrap(); + + let refreshed = sub_rx.borrow().clone().unwrap(); + assert_eq!(refreshed.token.secret(), "refreshed"); + assert_eq!(refreshed.expires_on, 200); + } + + #[test] + fn secret_debug_does_not_leak() { + let s = Secret::new("super-secret-value"); + assert_eq!(format!("{:?}", s), "Secret"); + } + + #[test] + fn secret_equality() { + let a = Secret::new("same"); + let b = Secret::new("same"); + let c = Secret::new("different"); + assert_eq!(a, b); + assert_ne!(a, c); + } + + #[test] + fn bearer_token_from_string() { + let token = BearerToken::new("my-token".to_string(), 42); + assert_eq!(token.token.secret(), "my-token"); + assert_eq!(token.expires_on, 42); + } +} diff --git a/rust/otap-dataflow/crates/engine/src/extensions/mod.rs b/rust/otap-dataflow/crates/engine/src/extensions/mod.rs index b543032fa6..32ba68e384 100644 --- a/rust/otap-dataflow/crates/engine/src/extensions/mod.rs +++ b/rust/otap-dataflow/crates/engine/src/extensions/mod.rs @@ -29,11 +29,15 @@ //! `effect_handler.get_extension_handle::("extension_name")`. pub mod auth; +pub mod bearer_token; pub mod registry; pub use auth::{ AuthError, ClientAuthenticator, ClientAuthenticatorHandle, ServerAuthenticator, ServerAuthenticatorHandle, }; +pub use bearer_token::{ + BearerToken, BearerTokenError, BearerTokenProvider, BearerTokenProviderHandle, Secret, +}; pub(crate) use registry::ExtensionRegistryBuilder; pub use registry::{ExtensionHandles, ExtensionRegistry}; diff --git a/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs b/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs index 14ecbff887..928c48fd70 100644 --- a/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs +++ b/rust/otap-dataflow/crates/otap/src/otap_grpc/middleware/auth.rs @@ -43,11 +43,7 @@ where S: ServiceBound, S::Future: Send, { - async fn call( - &self, - req: Request, - mut service: S, - ) -> Result, S::Error> { + async fn call(&self, req: Request, mut service: S) -> Result, S::Error> { if let Some(auth) = &self.auth { if let Err(e) = auth.authenticate(req.headers()) { let status = tonic::Status::unauthenticated(e.message); diff --git a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs index 7615d47713..85fe4f2ee8 100644 --- a/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs +++ b/rust/otap-dataflow/crates/otap/src/otlp_receiver.rs @@ -256,9 +256,8 @@ impl OTLPReceiver { // Validate auth config if present. if let Some(auth) = &config.auth { - auth.validate().map_err(|e| { - otap_df_config::error::Error::InvalidUserConfig { error: e } - })?; + auth.validate() + .map_err(|e| otap_df_config::error::Error::InvalidUserConfig { error: e })?; } // Validate that gRPC and HTTP do not have conflicting listening addresses. @@ -638,12 +637,11 @@ impl shared::Receiver for OTLPReceiver { Either::Right(GlobalConcurrencyLimitLayer::new(grpc_max)) }; - let mut server = - common::apply_server_tuning(Server::builder(), grpc_config) - .layer(limit_layer) - .layer(tonic_middleware::MiddlewareLayer::new(AuthMiddleware::new( - auth_handle.clone(), - ))); + let mut server = common::apply_server_tuning(Server::builder(), grpc_config) + .layer(limit_layer) + .layer(tonic_middleware::MiddlewareLayer::new(AuthMiddleware::new( + auth_handle.clone(), + ))); if let Some(timeout) = grpc_config.timeout { server = server.timeout(timeout);