Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions rust/otap-dataflow/benchmarks/benches/exporter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use tonic::{Request, Response, Status};
use otap_df_config::node::NodeUserConfig;
use otap_df_engine::context::ControllerContext;
use otap_df_engine::control::{Controllable, NodeControlMsg, pipeline_ctrl_msg_channel};
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_otap::otap_exporter::OTAP_EXPORTER_URN;
use otap_df_otap::otlp_grpc::OTLPData;
use otap_df_otap::perf_exporter::exporter::OTAP_PERF_EXPORTER_URN;
Expand Down Expand Up @@ -454,7 +455,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down Expand Up @@ -520,7 +521,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down Expand Up @@ -591,7 +592,7 @@ fn bench_exporter(c: &mut Criterion) {
let local = LocalSet::new();
let _run_exporter_handle = local.spawn_local(async move {
exporter
.start(node_req_tx, metrics_reporter)
.start(node_req_tx, metrics_reporter, ExtensionRegistry::empty())
.await
.expect("Exporter event loop failed")
});
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub enum NodeKind {
// Connector,
/// A merged chain of consecutive processors (experimental).
ProcessorChain,
/// A non-pipeline extension (e.g., auth provider, health check).
Extension,
}

impl From<NodeKind> for Cow<'static, str> {
Expand All @@ -103,6 +105,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
NodeKind::Extension => "extension".into(),
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/config/src/node_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
}
}

Expand All @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
"receiver" => Ok(NodeKind::Receiver),
"processor" => Ok(NodeKind::Processor),
"exporter" => Ok(NodeKind::Exporter),
"extension" => Ok(NodeKind::Extension),
_ => Err(invalid_plugin_urn(
raw,
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
format!(
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
),
)),
}
}
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,9 @@ impl PipelineConfig {
!has_incoming || !has_outgoing
}
NodeKind::Exporter => !has_incoming,
// Extensions are standalone services; they never participate
// in the data-flow graph and must not be pruned.
NodeKind::Extension => false,
};

if should_remove {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use otap_df_engine::ConsumerEffectHandlerExtension;
use otap_df_engine::context::PipelineContext;
use otap_df_engine::control::{AckMsg, NackMsg, NodeControlMsg};
use otap_df_engine::error::Error as EngineError;
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{Message, MessageChannel};
use otap_df_engine::terminal_state::TerminalState;
Expand Down Expand Up @@ -462,6 +463,7 @@ impl Exporter<OtapPdata> for AzureMonitorExporter {
mut self: Box<Self>,
mut msg_chan: MessageChannel<OtapPdata>,
effect_handler: EffectHandler<OtapPdata>,
_extension_registry: ExtensionRegistry,
) -> Result<TerminalState, EngineError> {
effect_handler
.info(&format!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use otap_df_engine::control::NodeControlMsg;
use otap_df_engine::control::{AckMsg, NackMsg};
use otap_df_engine::error::Error;
use otap_df_engine::exporter::ExporterWrapper;
use otap_df_engine::extensions::ExtensionRegistry;
use otap_df_engine::local::exporter::{EffectHandler, Exporter};
use otap_df_engine::message::{Message, MessageChannel};
use otap_df_engine::node::NodeId;
Expand Down Expand Up @@ -501,6 +502,7 @@ impl Exporter<OtapPdata> for GenevaExporter {
mut self: Box<Self>,
mut msg_chan: MessageChannel<OtapPdata>,
effect_handler: EffectHandler<OtapPdata>,
_extension_registry: ExtensionRegistry,
) -> Result<TerminalState, Error> {
otel_info!(
"geneva_exporter.start",
Expand Down
15 changes: 15 additions & 0 deletions rust/otap-dataflow/crates/engine-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
let receiver_factories_name = quote::format_ident!("{}_RECEIVER_FACTORIES", prefix);
let processor_factories_name = quote::format_ident!("{}_PROCESSOR_FACTORIES", prefix);
let exporter_factories_name = quote::format_ident!("{}_EXPORTER_FACTORIES", prefix);
let extension_factories_name = quote::format_ident!("{}_EXTENSION_FACTORIES", prefix);
let get_receiver_factory_map_name = quote::format_ident!(
"get_{}_receiver_factory_map",
prefix.to_string().to_lowercase()
Expand All @@ -87,6 +88,10 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
"get_{}_exporter_factory_map",
prefix.to_string().to_lowercase()
);
let get_extension_factory_map_name = quote::format_ident!(
"get_{}_extension_factory_map",
prefix.to_string().to_lowercase()
);

let output = quote! {
/// A slice of receiver factories.
Expand All @@ -101,6 +106,10 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
#[::otap_df_engine::distributed_slice]
pub static #exporter_factories_name: [::otap_df_engine::ExporterFactory<#pdata_type>] = [..];

/// A slice of extension factories.
#[::otap_df_engine::distributed_slice]
pub static #extension_factories_name: [::otap_df_engine::ExtensionFactory] = [..];

/// The factory registry instance.
#registry_vis static #registry_name: std::sync::LazyLock<PipelineFactory<#pdata_type>> = std::sync::LazyLock::new(|| {
// Reference build_registry to avoid unused import warning, even though we don't call it
Expand All @@ -109,6 +118,7 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
&#receiver_factories_name,
&#processor_factories_name,
&#exporter_factories_name,
&#extension_factories_name,
)
});

Expand All @@ -126,6 +136,11 @@ pub fn pipeline_factory(args: TokenStream, input: TokenStream) -> TokenStream {
pub fn #get_exporter_factory_map_name() -> &'static std::collections::HashMap<&'static str, ::otap_df_engine::ExporterFactory<#pdata_type>> {
#registry_name.get_exporter_factory_map()
}

/// Gets the extension factory map, initializing it if necessary.
pub fn #get_extension_factory_map_name() -> &'static std::collections::HashMap<&'static str, ::otap_df_engine::ExtensionFactory> {
#registry_name.get_extension_factory_map()
}
};

output.into()
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ data-encoding = { workspace = true }
prost = { workspace = true }
byte-unit = { workspace = true }
cpu-time = { workspace = true }
http = { workspace = true }
nix = { workspace = true, features = ["resource"] }

[target.'cfg(not(windows))'.dependencies]
Expand Down
14 changes: 5 additions & 9 deletions rust/otap-dataflow/crates/engine/src/channel_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use crate::channel_metrics::{
ChannelSenderMetrics, control_channel_id,
};
use crate::context::PipelineContext;
use crate::control::NodeControlMsg;
use crate::entity_context::current_node_telemetry_handle;
use crate::local::message::{LocalReceiver, LocalSender};
use crate::shared::message::{SharedReceiver, SharedSender};
Expand Down Expand Up @@ -171,24 +170,21 @@ impl ChannelMode for SharedMode {
}
}

/// Generic helper used by receiver, processor, and exporter wrappers.
/// Generic helper used by receiver, processor, exporter, and extension wrappers.
/// It keeps local and shared wiring identical while still emitting mode-specific code.
///
/// The logic first attempts to unwrap the inner MPSC channel so metrics can be attached.
/// If the channel is already wrapped, it preserves the existing wrapper to avoid double
/// instrumentation.
pub(crate) fn wrap_control_channel_metrics<M, PData>(
pub(crate) fn wrap_control_channel_metrics<M, T>(
node_id: &crate::node::NodeId,
pipeline_ctx: &PipelineContext,
channel_metrics: &mut ChannelMetricsRegistry,
channel_metrics_enabled: bool,
capacity: u64,
control_sender: M::ControlSender<NodeControlMsg<PData>>,
control_receiver: M::ControlReceiver<NodeControlMsg<PData>>,
) -> (
M::ControlSender<NodeControlMsg<PData>>,
M::ControlReceiver<NodeControlMsg<PData>>,
)
control_sender: M::ControlSender<T>,
control_receiver: M::ControlReceiver<T>,
) -> (M::ControlSender<T>, M::ControlReceiver<T>)
where
M: ChannelMode,
{
Expand Down
36 changes: 36 additions & 0 deletions rust/otap-dataflow/crates/engine/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ pub struct ExporterConfig {
pub input_pdata_channel: PdataChannelConfig,
}

/// Generic configuration for an extension.
///
/// Extensions are non-pipeline components (e.g., auth providers, health checks)
/// that only receive control messages. They do not participate in pdata flow.
#[derive(Clone, Debug)]
pub struct ExtensionConfig {
/// Name of the extension.
pub name: NodeId,
/// Configuration for control channel.
pub control_channel: ControlChannelConfig,
}

impl ReceiverConfig {
/// Creates a new receiver configuration with default channel capacities.
pub fn new<T>(name: T) -> Self
Expand Down Expand Up @@ -172,3 +184,27 @@ impl ExporterConfig {
}
}
}

impl ExtensionConfig {
/// Creates a new extension configuration with default channel capacities.
pub fn new<T>(name: T) -> Self
where
T: Into<NodeId>,
{
Self::with_channel_capacity(name, DEFAULT_CONTROL_CHANNEL_CAPACITY)
}

/// Creates a new extension configuration with an explicit control channel capacity.
#[must_use]
pub fn with_channel_capacity<T>(name: T, control_channel_capacity: usize) -> Self
where
T: Into<NodeId>,
{
ExtensionConfig {
name: name.into(),
control_channel: ControlChannelConfig {
capacity: control_channel_capacity,
},
}
}
}
38 changes: 38 additions & 0 deletions rust/otap-dataflow/crates/engine/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,44 @@ pub enum NodeControlMsg<PData> {
},
}

/// Control messages sent by the pipeline engine to **extensions**.
///
/// This is a PData-free subset of [`NodeControlMsg`] — extensions never process
/// Ack/Nack/DelayedData, so they receive a simpler, non-generic enum.
#[derive(Debug, Clone)]
pub enum ExtensionControlMsg {
/// Notifies the extension of a configuration change.
Config {
/// The new configuration as a JSON value.
config: serde_json::Value,
},

/// Emitted when a scheduled timer expires.
TimerTick {},

/// Signal to collect/flush local telemetry metrics.
CollectTelemetry {
/// Metrics reporter used to collect telemetry metrics.
metrics_reporter: MetricsReporter,
},

/// Requests a graceful shutdown.
Shutdown {
/// Deadline for shutdown.
deadline: Instant,
/// Human-readable reason for the shutdown.
reason: String,
},
}

impl ExtensionControlMsg {
/// Returns `true` if this control message is a shutdown request.
#[must_use]
pub const fn is_shutdown(&self) -> bool {
matches!(self, ExtensionControlMsg::Shutdown { .. })
}
}

/// Control messages sent by nodes to the pipeline engine to manage node-specific operations
/// and control pipeline behavior.
#[derive(Debug, Clone)]
Expand Down
Loading
Loading