Skip to content

[DRAFT] OTAP Dataflow extension support#1958

Closed
gouslu wants to merge 10 commits into
open-telemetry:mainfrom
gouslu:gouslu/extension-support
Closed

[DRAFT] OTAP Dataflow extension support#1958
gouslu wants to merge 10 commits into
open-telemetry:mainfrom
gouslu:gouslu/extension-support

Conversation

@gouslu
Copy link
Copy Markdown
Contributor

@gouslu gouslu commented Feb 5, 2026

Extensions Architecture

Draft PR to discuss a proposal of extensions architecture for OTAP Dataflow Engine. Please feel free to
participate in the discussion on issue #1966 .

Overview

Extensions are pipeline components that provide shared services to other components
(receivers, processors, exporters). Unlike data-processing components, extensions don't
participate in the data flow—instead, they expose capabilities that other components
can look up and use. Extensions support start method similar to other components and
can run recurring tasks. Additionally, they can implement first class traits and make them
available via "get_extension" interface in the effect handler.

Example use cases:

  • Authentication (providing tokens to exporters)
  • Shared resources (connection pools, rate limiters)

Benefits

Reusability: One extension can be used by multiple components. Configure an
authentication extension once, and any number of exporters can reference it.

Multiple instances: Create multiple instances of the same extension type with
different configurations (e.g., separate auth for different components or one single component with multitenant support).

Configurability: Extensions are configured by name in YAML. Components reference
them by name, and the engine handles the wiring.

API stability: Extension traits are first-class interfaces defined in the engine.
The sealed trait pattern locks down the API surface, ensuring stability for both
extension authors and consumers.

Example configuration:

nodes:
  # Define the extension once
  my-auth-extension:
    kind: extension
    plugin_urn: "urn:otel:azureidentityauth:extension"
    config:
      method: managed_identity
      scope: "https://example.com/.default"

  # First exporter references it by name
  exporter-1:
    kind: exporter
    plugin_urn: "urn:otel:azuremonitor:exporter"
    config:
      auth: my-auth-extension  # <- reference by name
      endpoint: "https://..."

  # Second exporter can use the same extension
  exporter-2:
    kind: exporter
    plugin_urn: "urn:otel:azuremonitor:exporter"
    config:
      auth: my-auth-extension  # <- same extension
      endpoint: "https://..."

Both exporters share the same extension instance.

Multiple instances: You can create multiple instances of the same extension type
with different configurations. Each instance has its own name:

nodes:
  # Production auth with managed identity
  prod-auth:
    kind: extension
    plugin_urn: "urn:otel:azureidentityauth:extension"
    config:
      method: managed_identity
      scope: "https://prod.example.com/.default"

  # Dev auth with CLI credentials
  dev-auth:
    kind: extension
    plugin_urn: "urn:otel:azureidentityauth:extension"
    config:
      method: dev
      scope: "https://dev.example.com/.default"

  prod-exporter:
    kind: exporter
    plugin_urn: "urn:otel:azuremonitor:exporter"
    config:
      auth: prod-auth

  dev-exporter:
    kind: exporter
    plugin_urn: "urn:otel:azuremonitor:exporter"
    config:
      auth: dev-auth

Architecture

  • ExtensionRegistry: Maps extension names to their capabilities
  • ExtensionBundle: Collection of trait implementations for one extension
  • Extension Traits: Capabilities like BearerTokenProvider

Components access extensions via their effect handler:

let auth: Arc<dyn BearerTokenProvider> = effect_handler
    .get_extension::<dyn BearerTokenProvider>("azure_auth")?;

Key Components

Extension Traits

Extension traits define capabilities. They use a sealed trait pattern—only traits
defined in otap-df-engine::extensions can be used with the extension system.

pub trait ExtensionTrait: private::Sealed + Send + Sync {}

impl private::Sealed for dyn BearerTokenProvider {}
impl ExtensionTrait for dyn BearerTokenProvider {}

ExtensionBundle

A bundle holds all trait implementations for a single extension. One extension
can implement multiple traits:

impl BearerTokenProvider for MyAuthExtension { /* ... */ }
impl CredentialRefresher for MyAuthExtension { /* ... */ }

let bundle = extension_bundle!(extension => BearerTokenProvider, CredentialRefresher);

The extension_bundle! Macro

Simplifies bundle creation:

// This:
let bundle = extension_bundle!(extension => BearerTokenProvider, CredentialRefresher);

// Expands to:
let mut bundle = ExtensionBundle::new();
bundle.insert::<dyn BearerTokenProvider>(extension.clone() as Arc<dyn BearerTokenProvider>);
bundle.insert::<dyn CredentialRefresher>(extension.clone() as Arc<dyn CredentialRefresher>);

The macro enforces compile-time type safety—only extension traits work.

ExtensionRegistry

Maps extension names to bundles. Built once during pipeline initialization,
then shared immutably with all components.

Why Arc<dyn Trait>?

Extensions are stored as Arc<dyn Trait> because multiple components need shared
access to the same instance. When an extension implements multiple traits, the same
Arc is cast to each trait—all lookups return the same underlying instance.

let auth: Arc<dyn BearerTokenProvider> = effect_handler
    .get_extension::<dyn BearerTokenProvider>("azure_auth")?;

Extension Lifecycle

  1. Registration: Extensions are registered via ExtensionFactory using the
    distributed_slice pattern for automatic discovery.

  2. Creation: During pipeline initialization, the engine creates each extension
    and collects their ExtensionBundles.

  3. Registry Build: All bundles are combined into an immutable ExtensionRegistry.

  4. Distribution: The registry is cloned to each component's effect handler.

  5. Start: Extensions run their main loop (e.g., token refresh) until shutdown.

  6. Consumption: Components call effect_handler.get_extension() to access
    extension capabilities.

Implementing an Extension

Example: Azure Identity Auth Extension

The AzureIdentityAuthExtension provides Azure authentication tokens to exporters.
Here's how it's structured:

1. Configuration (config.rs)

#[derive(Debug, Deserialize, Clone)]
pub struct Config {
    /// Authentication method to use.
    #[serde(default)]
    pub method: AuthMethod,
    
    /// Client ID for user-assigned managed identity (optional).
    pub client_id: Option<String>,
    
    /// OAuth scope for token acquisition.
    #[serde(default = "default_scope")]
    pub scope: String,
}

#[derive(Debug, Deserialize, Clone, PartialEq, Default)]
#[serde(rename_all = "lowercase")]
pub enum AuthMethod {
    #[default]
    ManagedIdentity,
    Development,
}

2. Extension Implementation (extension.rs)

pub struct AzureIdentityAuthExtension {
    credential: Arc<dyn TokenCredential>,
    scope: String,
    method: AuthMethod,
    token_sender: watch::Sender<Option<BearerToken>>,
}

// Implement the extension trait for capability exposure
#[async_trait]
impl BearerTokenProvider for AzureIdentityAuthExtension {
    async fn get_token(&self) -> Result<BearerToken, Error> {
        let access_token = self.get_token_internal().await?;
        Ok(BearerToken::new(
            access_token.token.secret().to_string(),
            access_token.expires_on.unix_timestamp(),
        ))
    }

    fn subscribe_token_refresh(&self) -> watch::Receiver<Option<BearerToken>> {
        self.token_sender.subscribe()
    }
}

// Implement the Extension trait for lifecycle management
#[async_trait(?Send)]
impl Extension<OtapPdata> for AzureIdentityAuthExtension {
    async fn start(
        self: Arc<Self>,
        mut msg_chan: MessageChannel<OtapPdata>,
        effect_handler: EffectHandler<OtapPdata>,
    ) -> Result<TerminalState, EngineError> {
        // Main event loop - handle control messages and proactive token refresh
        loop {
            tokio::select! {
                biased;
                
                // Proactive token refresh
                _ = tokio::time::sleep_until(next_token_refresh) => {
                    // Refresh token and broadcast to subscribers
                    let token = self.get_token().await?;
                    self.token_sender.send(Some(token));
                    // Schedule next refresh
                }
                
                // Handle control messages (shutdown, config updates, etc.)
                msg = msg_chan.recv() => {
                    match msg? {
                        Message::Control(NodeControlMsg::Shutdown { .. }) => break,
                        // ... handle other messages
                    }
                }
            }
        }
        Ok(TerminalState::default())
    }
}

3. Factory Registration (mod.rs)

pub const AZURE_IDENTITY_AUTH_EXTENSION_URN: &str = "urn:otel:azureidentityauth:extension";

#[distributed_slice(OTAP_EXTENSION_FACTORIES)]
pub static AZURE_IDENTITY_AUTH_EXTENSION: ExtensionFactory<OtapPdata> = ExtensionFactory {
    name: AZURE_IDENTITY_AUTH_EXTENSION_URN,
    create: |_ctx, node, node_config, extension_config| {
        // Parse and validate configuration
        let cfg: Config = serde_json::from_value(node_config.config.clone())?;
        cfg.validate()?;
        
        // Create the extension
        let extension = Arc::new(AzureIdentityAuthExtension::new(cfg)?);
        
        // Return wrapper with extension bundle declaring capabilities
        Ok(ExtensionWrapper::local(
            extension.clone(),
            extension_bundle!(extension => BearerTokenProvider),
            node,
            node_config,
            extension_config,
        ))
    },
};

4. Pipeline Configuration (YAML)

extensions:
  azure_auth:
    urn: urn:otel:azureidentityauth:extension
    config:
      method: managed_identity
      scope: https://monitor.azure.com/.default

exporters:
  azure_monitor:
    urn: urn:otel:azuremonitor:exporter
    config:
      auth_extension: azure_auth  # Reference by name
      endpoint: https://...

Consuming Extensions

Components access extensions through their effect handler:

// In an exporter's start() method
async fn start(
    self: Arc<Self>,
    mut msg_chan: MessageChannel<PData>,
    effect_handler: EffectHandler<PData>,
) -> Result<TerminalState, Error> {
    // Look up the auth extension by name
    let auth_provider: Arc<dyn BearerTokenProvider> = effect_handler
        .get_extension::<dyn BearerTokenProvider>("azure_auth")?;
    
    // Subscribe to token refresh events
    let mut token_rx = auth_provider.subscribe_token_refresh();
    
    loop {
        tokio::select! {
            // React to token refreshes
            _ = token_rx.changed() => {
                if let Some(token) = token_rx.borrow().as_ref() {
                    self.update_auth_header(token);
                }
            }
            
            // Process data
            msg = msg_chan.recv() => {
                // Use current token for requests
                let token = auth_provider.get_token().await?;
                self.send_with_auth(msg, &token).await?;
            }
        }
    }
}

Adding New Extension Traits

To add a new extension trait:

  1. Define the trait in otap-df-engine/src/extensions/:
// new_trait.rs
#[async_trait]
pub trait MyNewCapability: Send + Sync {
    async fn do_something(&self) -> Result<(), Error>;
}
  1. Implement ExtensionTrait in mod.rs:
impl private::Sealed for dyn MyNewCapability {}
impl ExtensionTrait for dyn MyNewCapability {}
  1. Re-export in mod.rs:
pub mod my_new_trait;
pub use my_new_trait::MyNewCapability;

Thread Safety

Extensions support two modes:

  • Local (!Send): Extension runs on a single thread. Use when the extension
    uses thread-local state or !Send types.

  • Shared (Send): Extension can be shared across threads. Required for
    extensions that need to run on the shared runtime.

The ExtensionWrapper enum abstracts over both modes, and the extension registry
stores Arc<dyn Trait> references that are always Send + Sync.

Error Handling

Extension lookup can fail with:

  • ExtensionError::NotFound: No extension with that name exists
  • ExtensionError::TraitNotImplemented: Extension exists but doesn't
    implement the requested trait

Components should handle these errors appropriately:

match effect_handler.get_extension::<dyn BearerTokenProvider>("auth") {
    Ok(provider) => { /* use provider */ }
    Err(ExtensionError::NotFound { name }) => {
        // Extension not configured - maybe use default auth?
    }
    Err(ExtensionError::TraitNotImplemented { name, expected }) => {
        // Configuration error - extension doesn't provide needed capability
        return Err(Error::config(format!(
            "Extension '{}' does not implement {}", name, expected
        )));
    }
}

@github-actions github-actions Bot added the rust Pull requests that update Rust code label Feb 5, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 5, 2026

Codecov Report

❌ Patch coverage is 54.95243% with 805 lines in your changes missing coverage. Please review.
✅ Project coverage is 85.09%. Comparing base (65b8bec) to head (eca84f2).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1958      +/-   ##
==========================================
- Coverage   85.47%   85.09%   -0.38%     
==========================================
  Files         512      518       +6     
  Lines      158299   159362    +1063     
==========================================
+ Hits       135307   135612     +305     
- Misses      22458    23216     +758     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 86.64% <54.95%> (-0.58%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.23% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
quiver 91.63% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@jmacd jmacd changed the title Gouslu/extension support [DRAFT] OTAP Dataflow extension support Feb 10, 2026
@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to lack of recent activity. It will be closed in 30 days if no further activity occurs. If this PR is still relevant, please comment or push new commits to keep it active.

@github-actions github-actions Bot added the stale Not actively pursued label Feb 25, 2026
@jmacd
Copy link
Copy Markdown
Contributor

jmacd commented Mar 5, 2026

In favor of #2141

@jmacd jmacd closed this Mar 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code stale Not actively pursued

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

2 participants