Skip to content
Closed
7 changes: 4 additions & 3 deletions rust/otap-dataflow/benchmarks/benches/exporter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use tonic::{Request, Response, Status};
use otap_df_config::node::NodeUserConfig;
use otap_df_engine::context::ControllerContext;
use otap_df_engine::control::{Controllable, NodeControlMsg, pipeline_ctrl_msg_channel};
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_otap::otap_exporter::OTAP_EXPORTER_URN;
use otap_df_otap::otlp_grpc::OTLPData;
use otap_df_otap::perf_exporter::exporter::OTAP_PERF_EXPORTER_URN;
Expand Down Expand Up @@ -454,7 +455,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down Expand Up @@ -520,7 +521,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down Expand Up @@ -591,7 +592,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum NodeKind {
// Connector,
/// A merged chain of consecutive processors (experimental).
ProcessorChain,
/// A non-pipeline extension (e.g., auth provider, health check).
Extension,
}

impl From<NodeKind> for Cow<'static, str> {
Expand All @@ -103,6 +105,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
NodeKind::Extension => "extension".into(),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/config/src/node_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
}
}

Expand All @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
"receiver" => Ok(NodeKind::Receiver),
"processor" => Ok(NodeKind::Processor),
"exporter" => Ok(NodeKind::Exporter),
"extension" => Ok(NodeKind::Extension),
_ => Err(invalid_plugin_urn(
raw,
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
format!(
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
),
)),
}
}
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ impl PipelineConfig {
!has_incoming || !has_outgoing
}
NodeKind::Exporter => !has_incoming,
// Extensions are standalone services; they never participate
// in the data-flow graph and must not be pruned.
NodeKind::Extension => false,
};

if should_remove {
Expand Down
14 changes: 14 additions & 0 deletions rust/otap-dataflow/crates/contrib-nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ recordset-kql-processor = [
]
resource-validator-processor = []

contrib-extensions = [
"bearer-auth-extension",
"azure-identity-auth-extension",
]
bearer-auth-extension = [
"dep:http",
]
azure-identity-auth-extension = [
"dep:azure_core",
"dep:azure_identity",
"dep:http",
"dep:rand",
]

[dev-dependencies]
otap-df-engine = { path = "../engine", features = ["test-utils"] }
otap-df-otap = { path = "../otap", features = ["test-utils"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use otap_df_engine::ConsumerEffectHandlerExtension;
use otap_df_engine::context::PipelineContext;
use otap_df_engine::control::{AckMsg, NackMsg, NodeControlMsg};
use otap_df_engine::error::Error as EngineError;
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{Message, MessageChannel};
use otap_df_engine::terminal_state::TerminalState;
Expand Down Expand Up @@ -462,6 +463,7 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
mut self: Box<Self>,
mut msg_chan: MessageChannel<OtapPdata>,
effect_handler: EffectHandler<OtapPdata>,
_extension_registry: ExtensionRegistry,
) -> Result<TerminalState, EngineError> {
effect_handler
.info(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use otap_df_engine::control::NodeControlMsg;
use otap_df_engine::control::{AckMsg, NackMsg};
use otap_df_engine::error::Error;
use otap_df_engine::exporter::ExporterWrapper;
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{Message, MessageChannel};
use otap_df_engine::node::NodeId;
Expand Down Expand Up @@ -501,6 +502,7 @@ impl Exporter<OtapPdata> for GenevaExporter {
mut self: Box<Self>,
mut msg_chan: MessageChannel<OtapPdata>,
effect_handler: EffectHandler<OtapPdata>,
_extension_registry: ExtensionRegistry,
) -> Result<TerminalState, Error> {
otel_info!(
"geneva_exporter.start",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Configuration types for the Azure Identity Auth Extension.

use serde::Deserialize;

/// Authentication method for Azure.
#[derive(Debug, Deserialize, Clone, PartialEq, Default)]
#[serde(rename_all = "lowercase")]
pub enum AuthMethod {
/// Use Managed Identity (system or user-assigned with client_id).
#[serde(alias = "msi", alias = "managed_identity")]
#[default]
ManagedIdentity,

/// Use developer tools (Azure CLI, Azure Developer CLI).
#[serde(alias = "dev", alias = "developer", alias = "cli")]
Development,
}

impl std::fmt::Display for AuthMethod {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuthMethod::ManagedIdentity => write!(f, "managed_identity"),
AuthMethod::Development => write!(f, "development"),
}
}
}

/// Configuration for the Azure Identity Auth Extension.
#[derive(Debug, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct Config {
/// Authentication method to use.
#[serde(default)]
pub method: AuthMethod,

/// Client ID for user-assigned managed identity (optional).
/// Only used when method is ManagedIdentity.
/// If not provided with ManagedIdentity, system-assigned identity will be used.
pub client_id: Option<String>,

/// OAuth scope for token acquisition.
/// Defaults to "https://management.azure.com/.default" for general Azure management.
#[serde(default = "default_scope")]
pub scope: String,
}

impl Default for Config {
fn default() -> Self {
Self {
method: AuthMethod::default(),
client_id: None,
scope: default_scope(),
}
}
}

impl Config {
/// Validate the configuration.
pub fn validate(&self) -> Result<(), super::error::Error> {
if self.scope.is_empty() {
return Err(super::error::Error::Config(
"OAuth scope cannot be empty".to_string(),
));
}
Ok(())
}
}

fn default_scope() -> String {
"https://management.azure.com/.default".to_string()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn default_config() {
let config = Config::default();
assert_eq!(config.method, AuthMethod::ManagedIdentity);
assert!(config.client_id.is_none());
assert_eq!(config.scope, "https://management.azure.com/.default");
}

#[test]
fn auth_method_display() {
assert_eq!(
format!("{}", AuthMethod::ManagedIdentity),
"managed_identity"
);
assert_eq!(format!("{}", AuthMethod::Development), "development");
}

#[test]
fn config_validation_empty_scope() {
let config = Config {
method: AuthMethod::ManagedIdentity,
client_id: None,
scope: String::new(),
};
assert!(config.validate().is_err());
}

#[test]
fn config_validation_valid() {
let config = Config::default();
assert!(config.validate().is_ok());
}

#[test]
fn config_deserialize_managed_identity() {
let json = serde_json::json!({
"method": "managed_identity",
"scope": "https://monitor.azure.com/.default"
});
let cfg: Config = serde_json::from_value(json).unwrap();
assert_eq!(cfg.method, AuthMethod::ManagedIdentity);
assert_eq!(cfg.scope, "https://monitor.azure.com/.default");
}

#[test]
fn config_deserialize_development() {
let json = serde_json::json!({
"method": "development",
"scope": "https://monitor.azure.com/.default"
});
let cfg: Config = serde_json::from_value(json).unwrap();
assert_eq!(cfg.method, AuthMethod::Development);
}

#[test]
fn config_rejects_unknown_fields() {
let json = serde_json::json!({
"method": "managed_identity",
"scope": "https://test.scope",
"unknown_field": true
});
assert!(serde_json::from_value::<Config>(json).is_err());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//! Error types for the Azure Identity Auth Extension.

use super::config::AuthMethod;

/// Error definitions for Azure Identity Auth Extension.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error during configuration of a component.
#[error("Configuration error: {0}")]
Config(String),

/// Authentication/authorization error.
#[error("Auth error ({kind})")]
Auth {
/// The kind of authentication error.
kind: AuthErrorKind,
/// The underlying Azure error, if any.
#[source]
source: Option<azure_core::error::Error>,
},
}

/// Specific authentication error variants.
#[derive(Debug, Clone, PartialEq)]
pub enum AuthErrorKind {
/// Failed to create the credential provider.
CreateCredential {
/// The authentication method that failed.
method: AuthMethod,
},

/// Failed to acquire a token.
TokenAcquisition,
}

impl std::fmt::Display for AuthErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AuthErrorKind::CreateCredential { method } => {
write!(f, "failed to create credential for method: {}", method)
}
AuthErrorKind::TokenAcquisition => write!(f, "failed to acquire token"),
}
}
}

impl Error {
/// Creates a new credential creation error.
#[must_use]
pub fn create_credential(method: AuthMethod, source: azure_core::error::Error) -> Self {
Error::Auth {
kind: AuthErrorKind::CreateCredential { method },
source: Some(source),
}
}

/// Creates a new token acquisition error.
#[must_use]
pub fn token_acquisition(source: azure_core::error::Error) -> Self {
Error::Auth {
kind: AuthErrorKind::TokenAcquisition,
source: Some(source),
}
}
}
Loading