Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
879 changes: 879 additions & 0 deletions docs/extension-system-architecture.md

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ experimental-tls = ["otap-df-otap/experimental-tls", "dep:rustls"]
contrib-exporters = ["otap-df-contrib-nodes/contrib-exporters"]
geneva-exporter = ["otap-df-contrib-nodes/geneva-exporter"]
azure-monitor-exporter = ["otap-df-contrib-nodes/azure-monitor-exporter"]
# Contrib extensions (opt-in) - now in contrib-nodes
contrib-extensions = ["otap-df-contrib-nodes/contrib-extensions"]
azure-identity-auth-extension = ["otap-df-contrib-nodes/azure-identity-auth-extension"]
# Contrib processors (opt-in) - now in contrib-nodes
contrib-processors = ["otap-df-contrib-nodes/contrib-processors"]
condense-attributes-processor = ["otap-df-contrib-nodes/condense-attributes-processor"]
Expand Down Expand Up @@ -284,6 +287,14 @@ module_name_repetitions = "allow"
broken_intra_doc_links = "deny"
missing_crate_level_docs = "deny"

# Optimize third-party dependencies even in debug builds.
# Without this, pure-Rust implementations of compression (miniz_oxide/flate2),
# serialization (serde_json), and protobuf decoding (prost) run at opt-level 0,
# which can be 100-500x slower than optimized code — making debug builds
# unusable for any real workload testing.
[profile.dev.package."*"]
opt-level = 2

[profile.release]
debug = "line-tables-only" # minimum required for profiling

Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ pub enum NodeKind {
Processor,
/// A sink of signals
Exporter,
/// A provider of shared capabilities (e.g., auth, service discovery).
Extension,

// ToDo(LQ) : Add more node kinds as needed.
// A connector between two pipelines
Expand All @@ -102,6 +104,7 @@ impl From<NodeKind> for Cow<'static, str> {
NodeKind::Receiver => "receiver".into(),
NodeKind::Processor => "processor".into(),
NodeKind::Exporter => "exporter".into(),
NodeKind::Extension => "extension".into(),
NodeKind::ProcessorChain => "processor_chain".into(),
}
}
Expand Down
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/config/src/node_urn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ const fn kind_suffix(expected_kind: NodeKind) -> &'static str {
NodeKind::Receiver => "receiver",
NodeKind::Processor | NodeKind::ProcessorChain => "processor",
NodeKind::Exporter => "exporter",
NodeKind::Extension => "extension",
}
}

Expand All @@ -228,9 +229,12 @@ fn parse_kind(raw: &str, kind: &str) -> Result<NodeKind, Error> {
"receiver" => Ok(NodeKind::Receiver),
"processor" => Ok(NodeKind::Processor),
"exporter" => Ok(NodeKind::Exporter),
"extension" => Ok(NodeKind::Extension),
_ => Err(invalid_plugin_urn(
raw,
format!("expected kind `receiver`, `processor`, or `exporter`, found `{kind}`"),
format!(
"expected kind `receiver`, `processor`, `exporter`, or `extension`, found `{kind}`"
),
)),
}
}
Expand Down
79 changes: 73 additions & 6 deletions rust/otap-dataflow/crates/config/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,26 @@ pub struct PipelineConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
policies: Option<Policies>,

/// All nodes in this pipeline, keyed by node ID.
/// All data-path nodes in this pipeline, keyed by node ID.
///
/// This includes receivers, processors, and exporters — but NOT extensions.
/// Extensions are configured in the sibling `extensions` section.
#[serde(default)]
nodes: PipelineNodes,

/// Pipeline extensions, keyed by extension ID.
///
/// Extensions are long-lived components that run alongside the pipeline and
/// expose functionality (e.g., authentication, service discovery) to other
/// components. Unlike nodes, extensions do NOT participate in data-path
/// connections.
#[serde(default, skip_serializing_if = "PipelineNodes::is_empty")]
extensions: PipelineNodes,

/// Explicit graph connections between nodes.
///
/// When provided, these connections are used as the authoritative topology for
/// the main pipeline graph.
/// the main pipeline graph. Extensions are not part of connections.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
connections: Vec<PipelineConnection>,
}
Expand Down Expand Up @@ -474,17 +486,28 @@ impl PipelineConfig {
self.policies.as_ref()
}

/// Returns a reference to the main pipeline nodes.
/// Returns a reference to the main pipeline nodes (receivers, processors, exporters).
#[must_use]
pub const fn nodes(&self) -> &PipelineNodes {
&self.nodes
}

/// Returns an iterator visiting all nodes in the pipeline.
/// Returns a reference to the pipeline extensions.
#[must_use]
pub const fn extensions(&self) -> &PipelineNodes {
&self.extensions
}

/// Returns an iterator visiting all data-path nodes in the pipeline.
pub fn node_iter(&self) -> impl Iterator<Item = (&NodeId, &Arc<NodeUserConfig>)> {
self.nodes.iter()
}

/// Returns an iterator visiting all extension nodes in the pipeline.
pub fn extension_iter(&self) -> impl Iterator<Item = (&NodeId, &Arc<NodeUserConfig>)> {
self.extensions.iter()
}

/// Returns true if the pipeline graph is defined with top-level connections.
#[must_use]
pub fn has_connections(&self) -> bool {
Expand All @@ -496,11 +519,16 @@ impl PipelineConfig {
self.connections.iter()
}

/// Creates a consuming iterator over the nodes in the pipeline.
/// Creates a consuming iterator over the data-path nodes in the pipeline.
pub fn node_into_iter(self) -> impl Iterator<Item = (NodeId, Arc<NodeUserConfig>)> {
self.nodes.into_iter()
}

/// Creates a consuming iterator over the extensions in the pipeline.
pub fn extension_into_iter(self) -> impl Iterator<Item = (NodeId, Arc<NodeUserConfig>)> {
self.extensions.into_iter()
}

/// Remove unconnected nodes from the main pipeline graph and return removed node descriptors.
///
/// Connectivity is defined by top-level `connections`:
Expand All @@ -526,6 +554,8 @@ impl PipelineConfig {
!has_incoming || !has_outgoing
}
NodeKind::Exporter => !has_incoming,
// Extensions are in a separate section and never appear in `nodes`.
NodeKind::Extension => false,
};

if should_remove {
Expand Down Expand Up @@ -590,17 +620,20 @@ impl PipelineConfig {
r#type: PipelineType::Otap,
policies,
nodes,
extensions: PipelineNodes::default(),
connections,
}
}

/// Normalize plugin URNs for pipeline nodes.
/// Normalize plugin URNs for pipeline nodes and extensions.
fn canonicalize_plugin_urns(
&mut self,
pipeline_group_id: &PipelineGroupId,
pipeline_id: &PipelineId,
) -> Result<(), Error> {
self.nodes
.canonicalize_plugin_urns(pipeline_group_id, pipeline_id)?;
self.extensions
.canonicalize_plugin_urns(pipeline_group_id, pipeline_id)
}

Expand Down Expand Up @@ -878,6 +911,7 @@ fn prune_connection(
pub struct PipelineConfigBuilder {
description: Option<Description>,
nodes: HashMap<NodeId, NodeUserConfig>,
extensions: HashMap<NodeId, NodeUserConfig>,
duplicate_nodes: Vec<NodeId>,
pending_connections: Vec<PendingConnection>,
}
Expand All @@ -896,6 +930,7 @@ impl PipelineConfigBuilder {
Self {
description: None,
nodes: HashMap::new(),
extensions: HashMap::new(),
duplicate_nodes: Vec::new(),
pending_connections: Vec::new(),
}
Expand Down Expand Up @@ -966,6 +1001,33 @@ impl PipelineConfigBuilder {
self.add_node(id, node_type, config)
}

/// Add an extension (configured as a sibling to nodes, not as a node).
pub fn add_extension<S: Into<NodeId>, U: Into<NodeUrn>>(
mut self,
id: S,
node_type: U,
config: Option<Value>,
) -> Self {
let id = id.into();
let node_type = node_type.into();
if self.extensions.contains_key(&id) || self.nodes.contains_key(&id) {
self.duplicate_nodes.push(id.clone());
} else {
_ = self.extensions.insert(
id.clone(),
NodeUserConfig {
r#type: node_type,
description: None,
entity: None,
outputs: Vec::new(),
default_output: None,
config: config.unwrap_or(Value::Null),
},
);
}
self
}

/// Connects a source node output port to one or more target nodes
/// with a given dispatch policy.
pub fn connect<S, P, T, I>(
Expand Down Expand Up @@ -1166,6 +1228,11 @@ impl PipelineConfigBuilder {
.into_iter()
.map(|(id, node)| (id, Arc::new(node)))
.collect(),
extensions: self
.extensions
.into_iter()
.map(|(id, node)| (id, Arc::new(node)))
.collect(),
connections: built_connections,
policies: None,
r#type: pipeline_type,
Expand Down
8 changes: 8 additions & 0 deletions rust/otap-dataflow/crates/contrib-nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ contrib-exporters = [
"geneva-exporter",
"azure-monitor-exporter",
]
contrib-extensions = [
"azure-identity-auth-extension",
]
azure-identity-auth-extension = [
"dep:azure_identity",
"dep:azure_core",
"dep:rand",
]
geneva-exporter = [
"dep:geneva-uploader",
"dep:opentelemetry-proto",
Expand Down
Loading