Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@
"rust/experimental/query_abstraction/Cargo.toml",
"rust/experimental/query_engine/Cargo.toml"
],

// Enable specific features for rust-analyzer
"rust-analyzer.cargo.features": [
"experimental-exporters",
"geneva-exporter",
"azure-monitor-exporter",
"azure-identity-auth-extension"
],

// Exclude Rust build artifacts from file watching and search
// to improve performance and reduce noise in search results.
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ experimental-tls = ["otap-df-otap/experimental-tls", "dep:rustls"]
experimental-exporters = ["otap-df-otap/experimental-exporters"]
geneva-exporter = ["otap-df-otap/geneva-exporter"]
azure-monitor-exporter = ["otap-df-otap/azure-monitor-exporter"]
azure-identity-auth-extension = ["otap-df-otap/azure-identity-auth-extension"]
# Experimental processors (opt-in)
experimental-processors = ["otap-df-otap/experimental-processors"]
condense-attributes-processor = ["otap-df-otap/condense-attributes-processor"]
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ pub enum NodeKind {
Processor,
/// A sink of signals
Exporter,
/// An extension providing auxiliary services (no signal processing)
Extension,

// ToDo(LQ) : Add more node kinds as needed.
// A connector between two pipelines
Expand All @@ -118,6 +120,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Receiver => "receiver".into(),
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::Extension => "extension".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/config/src/urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub fn validate_plugin_urn(raw: &str, expected_kind: NodeKind) -> Result<(), Err
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
};
if last != expected_suffix {
return Err(Error::InvalidUserConfig {
Expand Down
19 changes: 17 additions & 2 deletions rust/otap-dataflow/crates/engine-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Parse for PipelineFactoryArgs {
/// The individual factory types are imported internally by the macro.
///
/// This generates:
/// - Distributed slices for receiver, processor, and exporter factories (prefixed)
/// - Distributed slices for receiver, processor, exporter, and extension factories (prefixed)
/// - Proper initialization of the FACTORY_REGISTRY with lazy loading
/// - Helper functions to access factory maps (prefixed)
#[proc_macro_attribute]
Expand All @@ -75,6 +75,7 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
let receiver_factories_name = quote::format_ident!("{}_RECEIVER_FACTORIES", prefix);
let processor_factories_name = quote::format_ident!("{}_PROCESSOR_FACTORIES", prefix);
let exporter_factories_name = quote::format_ident!("{}_EXPORTER_FACTORIES", prefix);
let extension_factories_name = quote::format_ident!("{}_EXTENSION_FACTORIES", prefix);
let get_receiver_factory_map_name = quote::format_ident!(
"get_{}_receiver_factory_map",
prefix.to_string().to_lowercase()
Expand All @@ -87,6 +88,10 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
"get_{}_exporter_factory_map",
prefix.to_string().to_lowercase()
);
let get_extension_factory_map_name = quote::format_ident!(
"get_{}_extension_factory_map",
prefix.to_string().to_lowercase()
);

let output = quote! {
/// A slice of receiver factories.
Expand All @@ -101,14 +106,19 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
#[::otap_df_engine::distributed_slice]
pub static #exporter_factories_name: [::otap_df_engine::ExporterFactory<#pdata_type>] = [..];

/// A slice of extension factories.
#[::otap_df_engine::distributed_slice]
pub static #extension_factories_name: [::otap_df_engine::ExtensionFactory<#pdata_type>] = [..];

/// The factory registry instance.
#registry_vis static #registry_name: std::sync::LazyLock<PipelineFactory<#pdata_type>> = std::sync::LazyLock::new(|| {
// Reference build_registry to avoid unused import warning, even though we don't call it
let _ = build_factory::<#pdata_type>;
PipelineFactory::new(
PipelineFactory::with_extensions(
&#receiver_factories_name,
&#processor_factories_name,
&#exporter_factories_name,
&#extension_factories_name,
)
});

Expand All @@ -126,6 +136,11 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
pub fn #get_exporter_factory_map_name() -> &'static std::collections::HashMap<&'static str, ::otap_df_engine::ExporterFactory<#pdata_type>> {
#registry_name.get_exporter_factory_map()
}

/// Gets the extension factory map, initializing it if necessary.
pub fn #get_extension_factory_map_name() -> &'static std::collections::HashMap<&'static str, ::otap_df_engine::ExtensionFactory<#pdata_type>> {
#registry_name.get_extension_factory_map()
}
};

output.into()
Expand Down
28 changes: 28 additions & 0 deletions rust/otap-dataflow/crates/engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ pub struct ExporterConfig {
pub input_pdata_channel: PdataChannelConfig,
}

/// Generic configuration for an extension.
///
/// Extensions are special components that don't process pdata, so they only have
/// a control channel configuration.
#[derive(Clone, Debug)]
pub struct ExtensionConfig {
/// Name of the extension.
pub name: NodeId,
/// Configuration for control channel.
pub control_channel: ControlChannelConfig,
}

impl ReceiverConfig {
/// Creates a new receiver configuration with the given name and default channel capacity.
pub fn new<T>(name: T) -> Self
Expand Down Expand Up @@ -137,3 +149,19 @@ impl ExporterConfig {
}
}
}

impl ExtensionConfig {
/// Creates a new extension configuration with the given name and default channel capacity.
#[must_use]
pub fn new<T>(name: T) -> Self
where
T: Into<NodeId>,
{
ExtensionConfig {
name: name.into(),
control_channel: ControlChannelConfig {
capacity: DEFAULT_CONTROL_CHANNEL_CAPACITY,
},
}
}
}
43 changes: 43 additions & 0 deletions rust/otap-dataflow/crates/engine/src/effect_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

use crate::control::{AckMsg, NackMsg, PipelineControlMsg, PipelineCtrlMsgSender};
use crate::error::Error;
use crate::extensions::ExtensionTrait;
use crate::extensions::registry::{ExtensionError, ExtensionRegistry};
use crate::node::NodeId;
use otap_df_channel::error::SendError;
use otap_df_telemetry::error::Error as TelemetryError;
Expand All @@ -25,6 +27,8 @@ pub(crate) struct EffectHandlerCore<PData> {
#[allow(dead_code)]
// Will be used in the future. ToDo report metrics from channel and messages.
pub(crate) metrics_reporter: MetricsReporter,
/// Registry of extension traits for capability lookup.
pub(crate) extension_registry: Option<ExtensionRegistry>,
}

impl<PData> EffectHandlerCore<PData> {
Expand All @@ -34,6 +38,7 @@ impl<PData> EffectHandlerCore<PData> {
node_id,
pipeline_ctrl_msg_sender: None,
metrics_reporter,
extension_registry: None,
}
}

Expand All @@ -45,12 +50,50 @@ impl<PData> EffectHandlerCore<PData> {
self.pipeline_ctrl_msg_sender = Some(pipeline_ctrl_msg_sender);
}

/// Sets the extension registry for this effect handler.
pub fn set_extension_registry(&mut self, registry: ExtensionRegistry) {
self.extension_registry = Some(registry);
}

/// Returns the id of the node associated with this effect handler.
#[must_use]
pub(crate) fn node_id(&self) -> NodeId {
self.node_id.clone()
}

/// Gets an extension trait implementation by extension name.
///
/// This allows components to look up capabilities provided by extensions.
///
/// # Type Parameters
///
/// * `T` - The trait type (e.g., `dyn BearerTokenProvider`). Must implement `ExtensionTrait`.
///
/// # Errors
///
/// Returns `ExtensionError::NotFound` if no extension with that name exists or if the
/// extension registry has not been set.
/// Returns `ExtensionError::TraitNotImplemented` if the extension doesn't implement the trait.
///
/// # Example
///
/// ```ignore
/// let token_provider: &dyn BearerTokenProvider = effect_handler
/// .get_extension::<dyn BearerTokenProvider>("azure_auth")?;
/// let token = token_provider.get_token();
/// ```
pub(crate) fn get_extension<T: ExtensionTrait + ?Sized + 'static>(
&self,
name: &str,
) -> Result<&T, ExtensionError> {
match &self.extension_registry {
Some(registry) => registry.get_trait::<T>(name),
None => Err(ExtensionError::NotFound {
name: name.to_string(),
}),
}
}

/// Print an info message to stdout.
///
/// This method provides a standardized way for all nodes in the pipeline
Expand Down
55 changes: 55 additions & 0 deletions rust/otap-dataflow/crates/engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,28 @@ impl fmt::Display for ProcessorErrorKind {
}
}

/// High-level classification for extension failures to aid troubleshooting.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ExtensionErrorKind {
/// Errors caused by invalid or incomplete configuration detected at runtime.
Configuration,
/// Errors raised while shutting down an extension.
Shutdown,
/// Catch-all for extension failures that do not fit other categories.
Other,
}

impl fmt::Display for ExtensionErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let label = match self {
ExtensionErrorKind::Configuration => "configuration",
ExtensionErrorKind::Shutdown => "shutdown",
ExtensionErrorKind::Other => "other",
};
write!(f, "{label}")
}
}

/// Formats the source chain of an error into a single display string.
#[must_use]
pub fn format_error_sources(error: &(dyn std::error::Error + 'static)) -> String {
Expand Down Expand Up @@ -323,6 +345,36 @@ pub enum Error {
plugin_urn: NodeUrn,
},

/// The specified extension already exists in the pipeline.
#[error("The extension `{extension}` already exists")]
ExtensionAlreadyExists {
/// The name of the extension that already exists.
extension: NodeId,
},

/// A wrapper for the extension errors.
#[error("An extension error occurred in node {extension} ({kind}): {error}{source_detail}")]
ExtensionError {
/// The name of the extension that encountered the error.
extension: NodeId,

/// High-level classification for the extension failure.
kind: ExtensionErrorKind,

/// The error that occurred.
error: String,

/// Pre-formatted representation of the source chain used when rendering the error.
source_detail: String,
},

/// Unknown extension plugin.
#[error("Unknown extension plugin `{plugin_urn}`")]
UnknownExtension {
/// The name of the unknown extension plugin.
plugin_urn: NodeUrn,
},

/// Unknown node.
#[error("Unknown node `{node}`")]
UnknownNode {
Expand Down Expand Up @@ -424,6 +476,8 @@ impl Error {
Error::ConfigError(_) => "ConfigError",
Error::ExporterAlreadyExists { .. } => "ExporterAlreadyExists",
Error::ExporterError { .. } => "ExporterError",
Error::ExtensionAlreadyExists { .. } => "ExtensionAlreadyExists",
Error::ExtensionError { .. } => "ExtensionError",
Error::InternalError { .. } => "InternalError",
Error::InvalidHyperEdge { .. } => "InvalidHyperEdge",
Error::IoError { .. } => "IoError",
Expand All @@ -443,6 +497,7 @@ impl Error {
Error::SpmcSharedNotSupported { .. } => "SpmcSharedNotSupported",
Error::TooManyNodes {} => "TooManyNodes",
Error::UnknownExporter { .. } => "UnknownExporter",
Error::UnknownExtension { .. } => "UnknownExtension",
Error::UnknownNode { .. } => "UnknownNode",
Error::UnknownOutPort { .. } => "UnknownPort",
Error::UnknownProcessor { .. } => "UnknownProcessor",
Expand Down
Loading