Skip to content

feat: initial implementation of route to pipeline stage#1786

Merged
albertlockett merged 16 commits into
open-telemetry:mainfrom
albertlockett:albert/route-to
Jan 14, 2026
Merged

feat: initial implementation of route to pipeline stage#1786
albertlockett merged 16 commits into
open-telemetry:mainfrom
albertlockett:albert/route-to

Conversation

@albertlockett
Copy link
Copy Markdown
Member

@albertlockett albertlockett commented Jan 14, 2026

Related to #1784

Adds an operator the columnar query engine that can be used to route an OTAP batch to some destination. The main use case is to have the transform processor capable of sending telemetry batches to different out ports, where the behaviour is defined by the query it is executing.

  • A new PipelineStage is implemented called RouterPipelineStage
  • A new data expression type is added to our AST called the OutputDataExpression
  • A new operator is added to OPL Parser that is parsed into the new data expression variant.

Example:

logs
| if (severity_text == "ERROR") {
    route_to "error_port"
} else if (severity_text == "INFO") {
    route_to "info_port"
} // else - route to default out_port

Some additional notes on the design:

Routing implementation is pluggable:
Although the main use case is to direct the batches to some out port, I didn't want to couple the implementation of the columnar query engine to the DF pipeline. This means I didn't want code in the query-engine crate referencing things that handle pdata routing like EffectHandler or message::Sender from the engine crate.

In general, I'm imagine use cases where pipelines driven by OPL could be executed in a variety of contexts, that may need to route data to a variety of destinations.

To make the router routing behaviour customizable, the pipeline::router module exposes a Router trait which users of columnar query-engine can implement.

Extensions & Execution State:
RouterPipelineStage will need to be able to find the implementation of Router. This PR adds the concept of ExecutionState and "extensions", which are a map of instances of types that pipeline stages may need during their execution.

The benefit of this "extension" pattern is that it helps improve future extensibility. For example, we could imagine users may eventually implement custom PipelineStages, which have external dependencies that need to be injected at runtime. Having these "extension"s available makes this possible.

The concept of "extensions" is similar to datafusion's SessionConfig extensions, but having our own implementation provides us with some benefits: our pipeline stages execute in a single threaded runtime, so extension's types don't need to be Send + Sync and can be accessed mutably.

The ExecutionState as a concept also has some auxiliary benefits beyond simply being a repository of extensions. In the future, there may be other mutable state that needs to be updated by pipeline stages such as metrics or state related to stream processing. Introducing this type now is the foundation for these future features.

Followups:

  • Ack/Nack will be handled in a followup PR. Since this kind of conditional routing splits the batch, we need to juggle incoming/outgoing contexts (much like the batch processor).
  • RouteToPipelineStage emits an empty batch after the incoming batch is sent elsewhere. It's forced to do this by the trait signature of PipelineStage. This is OK for now, but in the future we probably want to introduce the concept of a "terminal pipeline stage" as a special type of pipeline stage consumes the batch.

@albertlockett albertlockett requested a review from a team as a code owner January 14, 2026 18:46
@github-actions github-actions Bot added rust Pull requests that update Rust code query-engine Query Engine / Transform related tasks query-engine-columnar Columnar query engine which uses DataFusion to process OTAP Batches query-engine-recordset Reference query engine implementation processing over a set of records labels Jan 14, 2026
@albertlockett albertlockett changed the title Albert/route to feat: initial implementation of route to pipeline stage Jan 14, 2026
@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 14, 2026

Codecov Report

❌ Patch coverage is 89.17749% with 50 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.17%. Comparing base (f72798b) to head (16446e4).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff            @@
##             main    #1786    +/-   ##
========================================
  Coverage   84.16%   84.17%            
========================================
  Files         482      485     +3     
  Lines      140170   140617   +447     
========================================
+ Hits       117979   118369   +390     
- Misses      21657    21714    +57     
  Partials      534      534            
Components Coverage Δ
otap-dataflow 85.39% <91.54%> (+0.01%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.52% <68.75%> (-0.02%) ⬇️
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@lquerel lquerel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made few non-blocking suggestions.

Comment thread rust/experimental/query_engine/expressions/src/data_expressions.rs Outdated
Comment on lines +123 to +124
let mut execution_state = ExecutionState::new();
execution_state.set_extension::<RouterExtType>(Box::new(RouterImpl::new()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the concept.

Comment on lines +167 to +174
.find(|p| p.as_ref() == route_name.as_str())
.ok_or_else(|| EngineError::ProcessorError {
processor: effect_handler.processor_id(),
kind: ProcessorErrorKind::Transport,
error: "Routing error: ".into(),
source_detail: format!("out_port name {} not configured", route_name),
})?
.clone();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not using the method send_message_to and checking the error ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_message_to takes as an arg P: Into<PortName> where type PortName = Cow<'static, &str>

#[inline]
pub async fn send_message_to<P>(&self, port: P, data: PData) -> Result<(), TypedError<PData>>
where
P: Into<PortName>,
{

/// The name of a node out port in the pipeline.
pub type PortName = Cow<'static, str>;

But the route_name here is an Rc<String> that might not have the `static lifetime, so I think to make this into a PortName I'd need to copy it, which I was trying to avoid

Comment thread rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs Outdated
{
Ok(otap_batch) => {
self.metrics.msgs_transformed.inc();
self.handle_routed_messages(effect_handler).await?;
Copy link
Copy Markdown
Member

@lalitb lalitb Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will drain/send the routed batches only when the pipeline execution succeed. What if the pipeline fails - these routed batches will remain cached, and leak into next successful run ? Should we be clearing the router in Err branch below (line 247) ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lalitb. Yes you're right, I think it's reasonable to try to handle the routed batches in the error branch as well. Made this change in 16446e4

@albertlockett albertlockett added this pull request to the merge queue Jan 14, 2026
Merged via the queue into open-telemetry:main with commit 8db8851 Jan 14, 2026
43 checks passed
@albertlockett albertlockett deleted the albert/route-to branch January 14, 2026 23:49
Comment on lines +354 to +359
/// Output data to a sink identified by name.
/// Currently this contains a static string because it's the only way we handle identifying
/// where to output the data. In the future we could support dynamic sink identified by a
/// variable, result of a function call, or other some expression, at which point we can change
/// this to contain the more general `StaticExpression`.
NamedSink(StringScalarExpression),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@albertlockett I just noticed this. Sorry for the late feedback but here it is...

  1. "Sink" What is a sink in the context of AST? Maybe "NamedDestination" would be better. Remember it is an explicit goal this AST\code NOT be coupled to the collector.

  2. Current design seems very limited. As far as routing is concerned, I can see different needs. Fan out\send\split, stuff like that.

    Consider something like this:

    enum RouteDataExpression {
      MapTo(RouteDestinationExpression), // Send the data and quit
      SplitTo(RouteDestinationExpression) // Send a copy of the data and continue on
    }
    
    enum RouteDestinationExpression {
       Named(StringScalarExpression)
    }

Copy link
Copy Markdown
Member Author

@albertlockett albertlockett Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re point 1: imo "sink" is kind of a synonym for to "destination". I don't see how one is any more coupled to the collector than the other, but I'm fine to change it to destination if you think it's a better name. I'm not personally hung up on the naming one way or the other.

re point 2: the intention of the output expression was simply that the data would be emitted to some destination, and that the variants of the enum would control how the destination is defined. We could change the design to have the expression variants control how data is routed in the pipeline, but that could also be achieved through other types of data expressions. For example, we could imagine a fork with multiple branches as child expressions, which could achieve the "copy the data and continue on" scenario in your example.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like we need a design discussion 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that'd be great! We can discuss during wednesday's SIG meeting

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

query-engine Query Engine / Transform related tasks query-engine-columnar Columnar query engine which uses DataFusion to process OTAP Batches query-engine-recordset Reference query engine implementation processing over a set of records rust Pull requests that update Rust code

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants