From 236138b9c36a05cc6ca59d6acab8aaec0382f50d Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 7 Jan 2026 16:09:41 -0500 Subject: [PATCH 01/16] started adding route-to --- .../engine-recordset/src/engine.rs | 8 +++ .../expressions/src/data_expressions.rs | 63 ++++++++++++++++++ .../crates/engine/src/control.rs | 2 +- .../crates/otap/src/transform_processor.rs | 1 + .../otap/src/transform_processor/routing.rs | 29 +++++++++ .../crates/query-engine/src/pipeline.rs | 3 + .../query-engine/src/pipeline/planner.rs | 14 ++-- .../query-engine/src/pipeline/routing.rs | 65 +++++++++++++++++++ 8 files changed, 180 insertions(+), 5 deletions(-) create mode 100644 rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs diff --git a/rust/experimental/query_engine/engine-recordset/src/engine.rs b/rust/experimental/query_engine/engine-recordset/src/engine.rs index 47ebd513be..a5267cc874 100644 --- a/rust/experimental/query_engine/engine-recordset/src/engine.rs +++ b/rust/experimental/query_engine/engine-recordset/src/engine.rs @@ -316,6 +316,14 @@ fn process_record<'a, TRecord: Record + 'static>( ); break; } + DataExpression::Output(c) => { + execution_context.add_diagnostic_if_enabled( + RecordSetEngineDiagnosticLevel::Error, + c, + || "Output Expression not yet supported in record set engine".into(), + ); + break; + } } } diff --git a/rust/experimental/query_engine/expressions/src/data_expressions.rs b/rust/experimental/query_engine/expressions/src/data_expressions.rs index fdd1606786..98d43af6b8 100644 --- a/rust/experimental/query_engine/expressions/src/data_expressions.rs +++ b/rust/experimental/query_engine/expressions/src/data_expressions.rs @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +use std::process::Output; + use crate::*; #[derive(Debug, Clone, PartialEq)] @@ -16,6 +18,9 @@ pub enum DataExpression { /// Conditional data expression. Conditional(ConditionalDataExpression), + + /// Output data expression + Output(OutputDataExpression), } impl DataExpression { @@ -28,6 +33,7 @@ impl DataExpression { DataExpression::Summary(s) => s.try_fold(scope), DataExpression::Transform(t) => t.try_fold(scope), DataExpression::Conditional(c) => c.try_fold(scope), + DataExpression::Output(o) => o.try_fold(scope), } } } @@ -39,6 +45,7 @@ impl Expression for DataExpression { DataExpression::Summary(s) => s.get_query_location(), DataExpression::Transform(t) => t.get_query_location(), DataExpression::Conditional(c) => c.get_query_location(), + DataExpression::Output(o) => o.get_query_location(), } } @@ -48,6 +55,7 @@ impl Expression for DataExpression { DataExpression::Summary(_) => "DataExpression(Summary)", DataExpression::Transform(_) => "DataExpression(Transform)", DataExpression::Conditional(_) => "DataExpression(Conditional)", + DataExpression::Output(_) => "DataExpression(Output)", } } @@ -57,6 +65,7 @@ impl Expression for DataExpression { DataExpression::Summary(s) => s.fmt_with_indent(f, indent), DataExpression::Transform(t) => t.fmt_with_indent(f, indent), DataExpression::Conditional(c) => c.fmt_with_indent(f, indent), + DataExpression::Output(o) => o.fmt_with_indent(f, indent), } } } @@ -295,3 +304,57 @@ impl ConditionalDataExpressionBranch { &self.expressions } } + + +/// TODO comments +#[derive(Debug, Clone, PartialEq)] +pub struct OutputDataExpression { + query_location: QueryLocation, + output: OutputExpression, +} + +impl OutputDataExpression { + pub fn new(query_location: QueryLocation, output: OutputExpression) -> Self { + Self { + query_location, + output, + } + } + + pub fn get_output(&self) -> &OutputExpression { + &self.output + } + + pub fn try_fold( + &mut self, + _scope: &PipelineResolutionScope, + ) -> Result<(), ExpressionError> { + // No folding currently supported for output expressions. + Ok(()) + } +} + +impl Expression for OutputDataExpression { + fn get_query_location(&self) -> &QueryLocation { + &self.query_location + } + + fn get_name(&self) -> &'static str { + "OutputDataExpression" + } + + // TODO test this + fn fmt_with_indent(&self, f: &mut std::fmt::Formatter<'_>, indent: &str) -> std::fmt::Result { + writeln!(f, "Output:")?; + write!(f, "{indent}└── ")?; + match &self.output { + OutputExpression::NamedSink(expr) => expr.fmt_with_indent(f, format!("{indent} ").as_str()), + } + } +} + +/// TODO comments +#[derive(Debug, Clone, PartialEq)] +pub enum OutputExpression { + NamedSink(StringScalarExpression) +} diff --git a/rust/otap-dataflow/crates/engine/src/control.rs b/rust/otap-dataflow/crates/engine/src/control.rs index 1807c15f06..52bae90eaa 100644 --- a/rust/otap-dataflow/crates/engine/src/control.rs +++ b/rust/otap-dataflow/crates/engine/src/control.rs @@ -57,7 +57,7 @@ impl From for f64 { } } -/// Standard context values hold three caller-specified fields. The +/// Standard context values hold three caller-specified fields. The /// size is arbitrary, but shouldn't be larger than needed by /// callers. For example: retry count, sequence and generation /// numbers, deadline, num_items, etc. diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 6deda2cf17..d4b1cf4f36 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -43,6 +43,7 @@ use self::metrics::Metrics; mod config; mod metrics; +mod routing; /// URN for the TransformProcessor pub const TRANSFORM_PROCESSOR_URN: &str = "urn:otel:transform:processor"; diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs new file mode 100644 index 0000000000..f52e73d8ae --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs @@ -0,0 +1,29 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use async_trait::async_trait; +use otap_df_engine::local::processor::EffectHandler; +use otap_df_pdata::{OtapArrowRecords, OtapPayload}; +use otap_df_query_engine::{ + error::{Result, Error}, + pipeline::routing::{RouterProvider, Router} +}; + +use crate::pdata::{Context, OtapPdata}; + +struct RouterImpl { + effect_handler: EffectHandler +} + +#[async_trait] +impl Router for RouterImpl { + async fn send(&self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { + // TODO this isn't the correct handling for context + let pdata = OtapPdata::new( + Context::default(), + OtapPayload::OtapArrowRecords(otap_batch) + ); + self.effect_handler.send_message_to(route_name, pdata).await?; + Ok(()) + } +} \ No newline at end of file diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs index 8b71fc2b96..7d03fc5d97 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs @@ -29,6 +29,9 @@ mod filter; mod functions; mod planner; +/// TODO docs +pub mod routing; + /// A stage in the pipeline. /// /// Used for the physical execution of one or more pipeline expressions. Stages are compiled diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs index 75a7413af0..78783c74f3 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs @@ -4,10 +4,7 @@ //! This module contains code for planning pipeline execution use data_engine_expressions::{ - BooleanValue, DataExpression, DateTimeValue, DoubleValue, Expression, IntegerValue, - LogicalExpression, MapSelector, MoveTransformExpression, MutableValueExpression, - PipelineExpression, ReduceMapTransformExpression, RenameMapKeysTransformExpression, - ScalarExpression, StaticScalarExpression, StringValue, TransformExpression, ValueAccessor, + BooleanValue, DataExpression, DateTimeValue, DoubleValue, Expression, IntegerValue, LogicalExpression, MapSelector, MoveTransformExpression, MutableValueExpression, OutputExpression, PipelineExpression, ReduceMapTransformExpression, RenameMapKeysTransformExpression, ScalarExpression, StaticScalarExpression, StringValue, TransformExpression, ValueAccessor }; use datafusion::logical_expr::{BinaryExpr, Expr, Operator, col, lit}; use datafusion::prelude::{SessionContext, lit_timestamp_nano}; @@ -22,6 +19,7 @@ use crate::pipeline::attributes::AttributeTransformPipelineStage; use crate::pipeline::conditional::{ConditionalPipelineStage, ConditionalPipelineStageBranch}; use crate::pipeline::filter::optimize::AttrsFilterCombineOptimizerRule; use crate::pipeline::filter::{Composite, FilterExec, FilterPipelineStage, FilterPlan}; +use crate::pipeline::routing::RouteToPipelineStage; use crate::pipeline::{BoxedPipelineStage, PipelineStage}; /// Converts an pipeline expression (AST) into a series of executable pipeline stages. @@ -136,6 +134,14 @@ impl PipelinePlanner { Ok(vec![Box::new(pipeline_stage)]) } + DataExpression::Output(output_expr) => { + match output_expr.get_output() { + OutputExpression::NamedSink(name) => { + Ok(vec![Box::new(RouteToPipelineStage::new(name.get_value()))]) + } + } + } + // TODO support other DataExpressions other => Err(Error::NotYetSupportedError { message: format!("data expression not yet supported {}", other.get_name()), diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs new file mode 100644 index 0000000000..d2e1ee714b --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::config::ConfigOptions; +use datafusion::execution::TaskContext; +use datafusion::execution::context::SessionContext; +use otap_df_pdata::OtapArrowRecords; + +use crate::error::{Error, Result}; +use crate::pipeline::PipelineStage; + +/// TODO docs +#[async_trait] +pub trait Router: Send + Sync { + /// TODO docs + async fn send(&self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()>; +} + +/// TODO docs +pub struct RouterProvider { + router: Box, +} + +/// TODO comments +pub struct RouteToPipelineStage { + outport_name: String, +} + +impl RouteToPipelineStage { + /// TODO comments + pub fn new(outport_name: &str) -> Self { + Self { + outport_name: outport_name.to_string(), + } + } +} + +#[async_trait(?Send)] +impl PipelineStage for RouteToPipelineStage { + async fn execute( + &mut self, + mut otap_batch: OtapArrowRecords, + session_context: &SessionContext, + _config_options: &ConfigOptions, + task_context: Arc, + ) -> Result { + let router_provider = match task_context + .session_config() + .get_extension::() + { + Some(r) => r, + None => { + todo!("oops") + } + }; + + router_provider.router.send(&self.outport_name, otap_batch).await?; + + // return empty batch + todo!() + } +} From 991fef35681048d36fe3faf7eb204a8962063fed Mon Sep 17 00:00:00 2001 From: albertlockett Date: Sun, 11 Jan 2026 16:58:35 -0500 Subject: [PATCH 02/16] made progress on the routing interface type passing --- .../crates/otap/src/transform_processor.rs | 17 +++- .../otap/src/transform_processor/routing.rs | 29 ++++--- .../crates/query-engine/src/pipeline.rs | 21 ++++- .../query-engine/src/pipeline/attributes.rs | 2 + .../query-engine/src/pipeline/conditional.rs | 4 + .../query-engine/src/pipeline/filter.rs | 2 + .../query-engine/src/pipeline/planner.rs | 16 ++-- .../query-engine/src/pipeline/routing.rs | 22 +----- .../crates/query-engine/src/pipeline/state.rs | 78 +++++++++++++++++++ 9 files changed, 148 insertions(+), 43 deletions(-) create mode 100644 rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index d4b1cf4f36..cdcd6ff71f 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -32,11 +32,11 @@ use otap_df_engine::{ processor::ProcessorWrapper, }; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; -use otap_df_query_engine::pipeline::Pipeline; +use otap_df_query_engine::pipeline::{Pipeline, routing::Router, state::ExecutionState}; use otap_df_telemetry::metrics::MetricSet; use serde_json::Value; -use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata}; +use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata, transform_processor::routing::RouterImpl}; use self::config::Config; use self::metrics::Metrics; @@ -51,6 +51,7 @@ pub const TRANSFORM_PROCESSOR_URN: &str = "urn:otel:transform:processor"; /// Opentelemetry Processing Language Processor pub struct TransformProcessor { pipeline: Pipeline, + execution_state: ExecutionState, signal_scope: SignalScope, metrics: MetricSet, } @@ -111,10 +112,16 @@ impl TransformProcessor { // query engine here. Currently, validation happens lazily when the first batch is seen. // https://github.com/open-telemetry/otel-arrow/issues/1634 + let mut execution_state = ExecutionState::new(); + // TODO should `Box` be a type alias exposed by the routing module? + // so we don't accidentally set the wrong type here + execution_state.set_extension::>(Box::new(RouterImpl::new())); + Ok(Self { signal_scope: SignalScope::try_from(&pipeline_expr)?, pipeline: Pipeline::new(pipeline_expr), metrics: pipeline_ctx.register_metrics::(), + execution_state, }) } @@ -181,7 +188,11 @@ impl Processor for TransformProcessor { } else { let mut otap_batch: OtapArrowRecords = payload.try_into()?; otap_batch.decode_transport_optimized_ids()?; - match self.pipeline.execute(otap_batch).await { + match self + .pipeline + .execute_with_state(otap_batch, &mut self.execution_state) + .await + { Ok(otap_batch) => { self.metrics.msgs_transformed.inc(); otap_batch.into() diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs index f52e73d8ae..de6f4e51c9 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs @@ -5,25 +5,30 @@ use async_trait::async_trait; use otap_df_engine::local::processor::EffectHandler; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; use otap_df_query_engine::{ - error::{Result, Error}, - pipeline::routing::{RouterProvider, Router} + error::{Error, Result}, + pipeline::routing::Router, }; use crate::pdata::{Context, OtapPdata}; -struct RouterImpl { - effect_handler: EffectHandler +pub struct RouterImpl {} + +impl RouterImpl { + pub fn new() -> Self { + Self {} + } } #[async_trait] impl Router for RouterImpl { async fn send(&self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { - // TODO this isn't the correct handling for context - let pdata = OtapPdata::new( - Context::default(), - OtapPayload::OtapArrowRecords(otap_batch) - ); - self.effect_handler.send_message_to(route_name, pdata).await?; - Ok(()) + todo!() + // // TODO this isn't the correct handling for context + // let pdata = OtapPdata::new( + // Context::default(), + // OtapPayload::OtapArrowRecords(otap_batch) + // ); + // self.effect_handler.send_message_to(route_name, pdata).await?; + // Ok(()) } -} \ No newline at end of file +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs index 7d03fc5d97..e3ac1f28a1 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs @@ -4,8 +4,6 @@ //! This module defines the top-level API for executing data transformation pipelines on //! streaming telemetry data in the OTAP columnar format. -use std::sync::Arc; - use arrow::compute::concat_batches; use async_trait::async_trait; use data_engine_expressions::PipelineExpression; @@ -18,9 +16,11 @@ use datafusion::physical_plan::streaming::PartitionStream; use datafusion::physical_plan::{ExecutionPlan, execute_stream}; use otap_df_pdata::OtapArrowRecords; use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; +use std::sync::Arc; use crate::error::{Error, Result}; use crate::pipeline::planner::PipelinePlanner; +use crate::pipeline::state::ExecutionState; use crate::table::RecordBatchPartitionStream; mod attributes; @@ -31,6 +31,8 @@ mod planner; /// TODO docs pub mod routing; +/// TODO docs +pub mod state; /// A stage in the pipeline. /// @@ -58,6 +60,7 @@ pub trait PipelineStage { session_context: &SessionContext, config_options: &ConfigOptions, task_context: Arc, + exec_options: &mut ExecutionState, ) -> Result; } @@ -86,6 +89,7 @@ impl PipelineStage for DataFusionPipelineStage { _session_context: &SessionContext, _config_options: &ConfigOptions, task_context: Arc, + _execution_options: &mut ExecutionState, ) -> Result { let rb = match otap_batch.get(self.payload_type) { Some(rb) => rb, @@ -212,7 +216,17 @@ impl Pipeline { /// /// # Returns /// The transformed telemetry data after all stages have executed - pub async fn execute(&mut self, mut otap_batch: OtapArrowRecords) -> Result { + pub async fn execute(&mut self, otap_batch: OtapArrowRecords) -> Result { + let mut exec_state = ExecutionState::default(); + self.execute_with_state(otap_batch, &mut exec_state).await + } + + /// TODO comments + pub async fn execute_with_state( + &mut self, + mut otap_batch: OtapArrowRecords, + exec_state: &mut ExecutionState, + ) -> Result { // lazily plan the pipeline if have not already done so if self.planned_pipeline.is_none() { let session_ctx = Self::create_session_context(); @@ -234,6 +248,7 @@ impl Pipeline { &pipeline.session_context, pipeline.config_options.as_ref(), pipeline.task_context.clone(), + exec_state, ) .await?; } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs index 860ee1d954..8207d5641b 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use crate::error::{Error, Result}; use crate::pipeline::PipelineStage; use crate::pipeline::planner::AttributesIdentifier; +use crate::pipeline::state::ExecutionState; /// This pipeline stage can be used to rename and delete attributes according to the transformation /// specified by the [`AttributesTransform`] @@ -40,6 +41,7 @@ impl PipelineStage for AttributeTransformPipelineStage { _session_context: &SessionContext, _config_options: &ConfigOptions, _task_context: Arc, + _exec_state: &mut ExecutionState, ) -> Result { let attrs_payload_type = match self.attrs_id { AttributesIdentifier::Root => match otap_batch { diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs index 3caee196d7..a32a55a796 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs @@ -18,6 +18,7 @@ use otap_df_pdata::otap::{Logs, Metrics, Traces}; use crate::error::Result; use crate::pipeline::filter::{Composite, FilterExec, filter_otap_batch}; +use crate::pipeline::state::ExecutionState; use crate::pipeline::{BoxedPipelineStage, PipelineStage}; /// This [`PipelineStage`] implementation will conditionally apply child pipeline stages on rows @@ -88,6 +89,7 @@ impl PipelineStage for ConditionalPipelineStage { session_ctx: &SessionContext, config_options: &ConfigOptions, task_context: Arc, + exec_state: &mut ExecutionState, ) -> Result { let root_batch = match otap_batch.root_record_batch() { Some(root_batch) => root_batch, @@ -146,6 +148,7 @@ impl PipelineStage for ConditionalPipelineStage { session_ctx, config_options, task_context.clone(), + exec_state, ) .await?; } @@ -167,6 +170,7 @@ impl PipelineStage for ConditionalPipelineStage { session_ctx, config_options, task_context.clone(), + exec_state, ) .await?; } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs index d954ad4b35..54387536ce 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs @@ -44,6 +44,7 @@ use crate::pipeline::planner::{ AttributesIdentifier, BinaryArg, ColumnAccessor, try_attrs_value_filter_from_literal, try_static_scalar_to_attr_literal, try_static_scalar_to_literal_for_column, }; +use crate::pipeline::state::ExecutionState; pub mod optimize; @@ -1711,6 +1712,7 @@ impl PipelineStage for FilterPipelineStage { session_context: &SessionContext, _config_options: &ConfigOptions, _task_context: Arc, + _exec_state: &mut ExecutionState, ) -> Result { if otap_batch.root_record_batch().is_none() { // if batch is empty, no filtering to do diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs index 78783c74f3..d789efb3f6 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs @@ -4,7 +4,11 @@ //! This module contains code for planning pipeline execution use data_engine_expressions::{ - BooleanValue, DataExpression, DateTimeValue, DoubleValue, Expression, IntegerValue, LogicalExpression, MapSelector, MoveTransformExpression, MutableValueExpression, OutputExpression, PipelineExpression, ReduceMapTransformExpression, RenameMapKeysTransformExpression, ScalarExpression, StaticScalarExpression, StringValue, TransformExpression, ValueAccessor + BooleanValue, DataExpression, DateTimeValue, DoubleValue, Expression, IntegerValue, + LogicalExpression, MapSelector, MoveTransformExpression, MutableValueExpression, + OutputExpression, PipelineExpression, ReduceMapTransformExpression, + RenameMapKeysTransformExpression, ScalarExpression, StaticScalarExpression, StringValue, + TransformExpression, ValueAccessor, }; use datafusion::logical_expr::{BinaryExpr, Expr, Operator, col, lit}; use datafusion::prelude::{SessionContext, lit_timestamp_nano}; @@ -134,13 +138,11 @@ impl PipelinePlanner { Ok(vec![Box::new(pipeline_stage)]) } - DataExpression::Output(output_expr) => { - match output_expr.get_output() { - OutputExpression::NamedSink(name) => { - Ok(vec![Box::new(RouteToPipelineStage::new(name.get_value()))]) - } + DataExpression::Output(output_expr) => match output_expr.get_output() { + OutputExpression::NamedSink(name) => { + Ok(vec![Box::new(RouteToPipelineStage::new(name.get_value()))]) } - } + }, // TODO support other DataExpressions other => Err(Error::NotYetSupportedError { diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs index d2e1ee714b..d10a6b563f 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -11,19 +11,15 @@ use otap_df_pdata::OtapArrowRecords; use crate::error::{Error, Result}; use crate::pipeline::PipelineStage; +use crate::pipeline::state::ExecutionState; /// TODO docs #[async_trait] -pub trait Router: Send + Sync { +pub trait Router { /// TODO docs async fn send(&self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()>; } -/// TODO docs -pub struct RouterProvider { - router: Box, -} - /// TODO comments pub struct RouteToPipelineStage { outport_name: String, @@ -46,19 +42,9 @@ impl PipelineStage for RouteToPipelineStage { session_context: &SessionContext, _config_options: &ConfigOptions, task_context: Arc, + _exec_state: &mut ExecutionState, ) -> Result { - let router_provider = match task_context - .session_config() - .get_extension::() - { - Some(r) => r, - None => { - todo!("oops") - } - }; - - router_provider.router.send(&self.outport_name, otap_batch).await?; - + // route otap_batch to self.outport_name // return empty batch todo!() } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs new file mode 100644 index 0000000000..0b38a8242b --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::hash::{BuildHasherDefault, Hasher}; + +/// Additional state that may be carried along during the execution of a pipeline. +#[derive(Default)] +pub struct ExecutionState { + // TODO comments + extensions: Option, +} + +impl ExecutionState { + /// Create new execution options. + pub fn new() -> Self { + Self { extensions: None } + } + + /// Get extension of type T, if it exists. + pub fn get_extension(&self) -> Option<&T> { + self.extensions.as_ref().and_then(|map| { + map.get(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_ref::()) + }) + } + + /// Get mutable extension of type T, if it exists. + pub fn get_extension_mut(&mut self) -> Option<&mut T> { + self.extensions.as_mut().and_then(|map| { + map.get_mut(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_mut::()) + }) + } + + /// Set extension of type T. + pub fn set_extension(&mut self, value: T) { + let map = self + .extensions + .get_or_insert_with(|| ExtensionMap::default()); + _ = map.insert(TypeId::of::(), Box::new(value)); + } +} + +/// Map that holds opaque objects indexed by their type. +/// +/// Note: this is similar to datafusion's `AnyMap`, which it also uses for extensions, but +/// without the `Send + Sync` bounds, as those are not required in this context due to these +/// pipeline stages executing in a single threaded runtime. This also means that pipeline +/// stages can get mutable references to extensions if needed. +type ExtensionMap = HashMap, BuildHasherDefault>; + +/// Hasher for [`ExtensionMap`]. +/// +/// This is the same as the one used by datafusion's `AnyMap`. +/// +/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, +/// coming from the compiler. The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then +/// returns it, instead of doing any bit fiddling. +#[derive(Default)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} From eb4c00f6bd12d0c351707c5f5f41bf48e3c540b9 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 12 Jan 2026 08:15:40 -0500 Subject: [PATCH 03/16] added implementation of router pipeline stage plus test --- .../expressions/src/data_expressions.rs | 2 - .../otap/src/transform_processor/routing.rs | 13 +- .../query-engine/src/pipeline/routing.rs | 111 ++++++++++++++++-- 3 files changed, 115 insertions(+), 11 deletions(-) diff --git a/rust/experimental/query_engine/expressions/src/data_expressions.rs b/rust/experimental/query_engine/expressions/src/data_expressions.rs index 98d43af6b8..ae04b1d00d 100644 --- a/rust/experimental/query_engine/expressions/src/data_expressions.rs +++ b/rust/experimental/query_engine/expressions/src/data_expressions.rs @@ -1,8 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -use std::process::Output; - use crate::*; #[derive(Debug, Clone, PartialEq)] diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs index de6f4e51c9..aeb0affb62 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +use std::any::Any; + use async_trait::async_trait; use otap_df_engine::local::processor::EffectHandler; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; @@ -21,7 +23,16 @@ impl RouterImpl { #[async_trait] impl Router for RouterImpl { - async fn send(&self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { todo!() // // TODO this isn't the correct handling for context // let pdata = OtapPdata::new( diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs index d10a6b563f..2087b074bb 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +use std::any::Any; use std::sync::Arc; use async_trait::async_trait; @@ -8,6 +9,7 @@ use datafusion::config::ConfigOptions; use datafusion::execution::TaskContext; use datafusion::execution::context::SessionContext; use otap_df_pdata::OtapArrowRecords; +use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; use crate::error::{Error, Result}; use crate::pipeline::PipelineStage; @@ -17,7 +19,13 @@ use crate::pipeline::state::ExecutionState; #[async_trait] pub trait Router { /// TODO docs - async fn send(&self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()>; + fn as_any(&self) -> &dyn Any; + + /// TODO dodcs + fn as_any_mut(&mut self) -> &mut dyn Any; + + /// TODO docs + async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()>; } /// TODO comments @@ -38,14 +46,101 @@ impl RouteToPipelineStage { impl PipelineStage for RouteToPipelineStage { async fn execute( &mut self, - mut otap_batch: OtapArrowRecords, - session_context: &SessionContext, + otap_batch: OtapArrowRecords, + _session_context: &SessionContext, _config_options: &ConfigOptions, - task_context: Arc, - _exec_state: &mut ExecutionState, + _task_context: Arc, + exec_state: &mut ExecutionState, ) -> Result { - // route otap_batch to self.outport_name - // return empty batch - todo!() + let root_payload_type = otap_batch.root_payload_type(); + match exec_state.get_extension_mut::>() { + Some(router) => { + router.send(&self.outport_name, otap_batch).await?; + } + None => { + return Err(Error::ExecutionError { + cause: "No router extension found in execution state".to_string(), + }); + } + } + + // emit empty batch + Ok(match root_payload_type { + ArrowPayloadType::Spans => OtapArrowRecords::Traces(Default::default()), + ArrowPayloadType::Logs => OtapArrowRecords::Logs(Default::default()), + _ => OtapArrowRecords::Metrics(Default::default()), + }) + } +} + +#[cfg(test)] +mod test { + use data_engine_expressions::{ + DataExpression, OutputDataExpression, OutputExpression, PipelineExpressionBuilder, + QueryLocation, StringScalarExpression, + }; + use otap_df_pdata::otap::Logs; + + use crate::pipeline::Pipeline; + + use super::*; + + struct TestRouter { + routed: Vec<(String, OtapArrowRecords)>, + } + + #[async_trait] + impl Router for TestRouter { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { + self.routed.push((route_name.to_string(), otap_batch)); + Ok(()) + } + } + + #[tokio::test] + async fn test_route_to_pipeline_stage() { + let output_expr = OutputDataExpression::new( + QueryLocation::new_fake(), + OutputExpression::NamedSink(StringScalarExpression::new( + QueryLocation::new_fake(), + "test_sink", + )), + ); + let pipeline_expr = PipelineExpressionBuilder::new("test") + .with_expressions(vec![DataExpression::Output(output_expr)]) + .build() + .unwrap(); + let mut pipeline = Pipeline::new(pipeline_expr); + + let mut exec_state = ExecutionState::new(); + let test_router = TestRouter { routed: vec![] }; + exec_state.set_extension::>(Box::new(test_router)); + + // TODO maybe not test with empty so we can make some verifications of the data + let otap_batch = OtapArrowRecords::Logs(Logs::default()); + let result = pipeline + .execute_with_state(otap_batch, &mut exec_state) + .await + .unwrap(); + let empty_batch = OtapArrowRecords::Logs(Logs::default()); + assert_eq!(result, empty_batch); + + let router = exec_state.get_extension_mut::>().unwrap(); + + match router.as_any_mut().downcast_mut::() { + Some(test_router) => { + assert_eq!(test_router.routed.len(), 1); + assert_eq!(test_router.routed[0].0, "test_sink"); + } + None => panic!("Failed to downcast router to TestRouter"), + } } } From f651f7f8eb7bd7c9066776fbf7e20473328e29ed Mon Sep 17 00:00:00 2001 From: albertlockett Date: Mon, 12 Jan 2026 10:38:33 -0500 Subject: [PATCH 04/16] added comments and error case test to router pipeline stage --- .../crates/otap/src/transform_processor.rs | 5 ++ .../query-engine/src/pipeline/routing.rs | 61 ++++++++++++++++--- .../crates/query-engine/src/pipeline/state.rs | 32 +++++++--- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index cdcd6ff71f..6b797ea725 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -492,4 +492,9 @@ mod test { }) .validate(|_ctx| async move {}) } + + #[test] + fn test_pipeline_with_route_to() { + todo!() + } } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs index 2087b074bb..2ddaf395b7 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -15,29 +15,42 @@ use crate::error::{Error, Result}; use crate::pipeline::PipelineStage; use crate::pipeline::state::ExecutionState; -/// TODO docs +/// A trait for routing OTAP (OpenTelemetry Arrow Protocol) batch records to some destination. +/// +/// The pipeline stage that handles routing will look for an implementation of this trait +/// in the `ExecutionState` extensions, and invoke it to send the data to the appropriate route. +/// +/// The trait also provides methods to allow down-casting to concrete implementations. This means +/// the execution state can own the router as a trait object, but the pipeline caller can retrieve +/// it, downcast it to a concrete type, and inspect any state it may have or call other methods. +/// This is useful for implementations that buffer batches before sending them. +/// #[async_trait] pub trait Router { - /// TODO docs + /// returns a reference as `Any` for down-casting fn as_any(&self) -> &dyn Any; - /// TODO dodcs + /// returns a mutable reference as `Any` for down-casting fn as_any_mut(&mut self) -> &mut dyn Any; - /// TODO docs + /// Send OTAP batch to the specified route. async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()>; } -/// TODO comments +/// [`PipelineStage`] that routes OTAP batches to a specified route. +/// +/// This stage looks for a `Router` implementation in the `ExecutionState` extensions, +/// and invokes it to send the batch to the specified route. [`Pipeline`s](super::Pipeline) +/// that include this stage must ensure that a `Router` is set in the execution state. pub struct RouteToPipelineStage { - outport_name: String, + route_name: String, } impl RouteToPipelineStage { - /// TODO comments - pub fn new(outport_name: &str) -> Self { + /// Create a new `RouteToPipelineStage` that routes to the specified route name. + pub fn new(route_name: &str) -> Self { Self { - outport_name: outport_name.to_string(), + route_name: route_name.to_string(), } } } @@ -55,7 +68,7 @@ impl PipelineStage for RouteToPipelineStage { let root_payload_type = otap_batch.root_payload_type(); match exec_state.get_extension_mut::>() { Some(router) => { - router.send(&self.outport_name, otap_batch).await?; + router.send(&self.route_name, otap_batch).await?; } None => { return Err(Error::ExecutionError { @@ -143,4 +156,32 @@ mod test { None => panic!("Failed to downcast router to TestRouter"), } } + + #[tokio::test] + async fn test_route_to_pipeline_stage_no_router() { + let output_expr = OutputDataExpression::new( + QueryLocation::new_fake(), + OutputExpression::NamedSink(StringScalarExpression::new( + QueryLocation::new_fake(), + "test_sink", + )), + ); + let pipeline_expr = PipelineExpressionBuilder::new("test") + .with_expressions(vec![DataExpression::Output(output_expr)]) + .build() + .unwrap(); + let mut pipeline = Pipeline::new(pipeline_expr); + let mut exec_state = ExecutionState::new(); + let otap_batch = OtapArrowRecords::Logs(Logs::default()); + let result = pipeline + .execute_with_state(otap_batch, &mut exec_state) + .await; + + match result { + Err(Error::ExecutionError { cause }) => { + assert_eq!(cause, "No router extension found in execution state"); + } + _ => panic!("Expected ExecutionError"), + } + } } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs index 0b38a8242b..0803badb26 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -6,6 +6,20 @@ use std::collections::HashMap; use std::hash::{BuildHasherDefault, Hasher}; /// Additional state that may be carried along during the execution of a pipeline. +/// +/// This can be used to store arbitrary extensions that may be needed by custom pipeline stages. +/// Extensions are stored in a type-map and pipeline stages can retrieve them by a known type. +/// +// Note: this is similar to datafusion's `ExecutionState`, which it also uses for extensions, but +// without the need for `Send + Sync` bounds, as those are not required in this context due to these +// pipeline stages executing in a single threaded runtime. This also means that pipeline +// stages can get mutable references to extensions if needed. +// +// In the future, this may be expanded to include other execution-related state like metrics +// +// Also note that when the pipeline is executed without an ExecutionState being provided, a default +// one will be created. This means that anything added to this in the future should be inexpensive +// to initialize in the default case. #[derive(Default)] pub struct ExecutionState { // TODO comments @@ -45,19 +59,19 @@ impl ExecutionState { /// Map that holds opaque objects indexed by their type. /// -/// Note: this is similar to datafusion's `AnyMap`, which it also uses for extensions, but -/// without the `Send + Sync` bounds, as those are not required in this context due to these -/// pipeline stages executing in a single threaded runtime. This also means that pipeline -/// stages can get mutable references to extensions if needed. +// Note: this is similar to datafusion's `AnyMap`, which it also uses for extensions, but +// without the `Send + Sync` bounds, as those are not required in this context due to these +// pipeline stages executing in a single threaded runtime. This also means that pipeline +// stages can get mutable references to extensions if needed. type ExtensionMap = HashMap, BuildHasherDefault>; /// Hasher for [`ExtensionMap`]. /// -/// This is the same as the one used by datafusion's `AnyMap`. -/// -/// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, -/// coming from the compiler. The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then -/// returns it, instead of doing any bit fiddling. +// This is the same as the one used by datafusion's `AnyMap`. +// +// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, +// coming from the compiler. The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then +// returns it, instead of doing any bit fiddling. #[derive(Default)] struct IdHasher(u64); From bb91da508f3741fd306d7ecb5a65734f99aeaafc Mon Sep 17 00:00:00 2001 From: albertlockett Date: Tue, 13 Jan 2026 13:14:28 -0500 Subject: [PATCH 05/16] support route_to in opl parser --- rust/otap-dataflow/crates/opl/src/opl.pest | 5 ++ .../crates/opl/src/parser/operator.rs | 67 +++++++++++++++++-- 2 files changed, 68 insertions(+), 4 deletions(-) diff --git a/rust/otap-dataflow/crates/opl/src/opl.pest b/rust/otap-dataflow/crates/opl/src/opl.pest index f97cff4197..34d7ef8cf2 100644 --- a/rust/otap-dataflow/crates/opl/src/opl.pest +++ b/rust/otap-dataflow/crates/opl/src/opl.pest @@ -164,9 +164,14 @@ where_operator_call = { "where" ~ expression } +route_to_operator_call = { + "route_to" ~ string_literal +} + operator_call = { set_operator_call | if_else_operator_call + | route_to_operator_call | where_operator_call } diff --git a/rust/otap-dataflow/crates/opl/src/parser/operator.rs b/rust/otap-dataflow/crates/opl/src/parser/operator.rs index 5d0e86265c..63bbb64bda 100644 --- a/rust/otap-dataflow/crates/opl/src/parser/operator.rs +++ b/rust/otap-dataflow/crates/opl/src/parser/operator.rs @@ -4,9 +4,9 @@ use data_engine_expressions::{ ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression, DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression, QueryLocation, - TransformExpression, + TransformExpression, OutputDataExpression, OutputExpression, StaticScalarExpression, }; -use data_engine_parser_abstractions::{ParserError, to_query_location}; +use data_engine_parser_abstractions::{ParserError, parse_standard_string_literal, to_query_location}; use pest::iterators::Pair; use crate::parser::assignment::parse_assignment_expression; @@ -20,8 +20,9 @@ pub(crate) fn parse_operator_call( ) -> Result<(), ParserError> { for rule in rule.into_inner() { match rule.as_rule() { - Rule::set_operator_call => parse_set_operator_call(rule, pipeline_builder)?, Rule::if_else_operator_call => parse_if_else_opeartor_call(rule, pipeline_builder)?, + Rule::route_to_operator_call => parse_route_to_operator_call(rule, pipeline_builder)?, + Rule::set_operator_call => parse_set_operator_call(rule, pipeline_builder)?, Rule::where_operator_call => parse_where_operator_call(rule, pipeline_builder)?, invalid_rule => { let query_location = to_query_location(&rule); @@ -37,6 +38,41 @@ pub(crate) fn parse_operator_call( Ok(()) } +pub(crate) fn parse_route_to_operator_call( + operator_call_rule: Pair<'_, Rule>, + pipeline_builder: &mut dyn PipelineBuilder, +) -> Result<(), ParserError> { + let query_location = to_query_location(&operator_call_rule); + if let Some(rule) = operator_call_rule.into_inner().next() { + let rule_query_location = to_query_location(&rule); + let dest = match rule.as_rule() { + Rule::string_literal => match parse_standard_string_literal(rule) { + StaticScalarExpression::String(string) => string, + invalid_expr => { + return Err(ParserError::SyntaxError( + rule_query_location, + format!("Expected static string literal, found {:?}", invalid_expr), + )); + } + }, + invalid_rule => { + let query_location = to_query_location(&rule); + return Err(invalid_child_rule_error( + query_location, + Rule::string_literal, + invalid_rule, + )); + } + }; + + let output_expr = + OutputDataExpression::new(query_location, OutputExpression::NamedSink(dest)); + pipeline_builder.push_data_expression(DataExpression::Output(output_expr)); + } + + Ok(()) +} + pub(crate) fn parse_set_operator_call( operator_call_rule: Pair<'_, Rule>, pipeline_builder: &mut dyn PipelineBuilder, @@ -211,7 +247,7 @@ mod tests { use data_engine_expressions::{ ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression, DiscardDataExpression, EqualToLogicalExpression, LogicalExpression, MutableValueExpression, - NotLogicalExpression, QueryLocation, ScalarExpression, SetTransformExpression, + NotLogicalExpression, OutputDataExpression, OutputExpression, QueryLocation, ScalarExpression, SetTransformExpression, SourceScalarExpression, StaticScalarExpression, StringScalarExpression, TransformExpression, ValueAccessor, }; @@ -223,6 +259,29 @@ mod tests { use crate::parser::pest::OplPestParser; use crate::parser::{OplParser, Rule}; + #[test] + fn test_route_to_operator_call() { + let query = "route_to \"test_out_port\""; + let mut state = ParserState::new(query); + let parse_result = OplPestParser::parse(Rule::operator_call, query).unwrap(); + assert_eq!(parse_result.len(), 1); + let rule = parse_result.into_iter().next().unwrap(); + parse_operator_call(rule, &mut state).unwrap(); + let result = state.build().unwrap(); + let expressions = result.get_expressions(); + assert_eq!(expressions.len(), 1); + + let expected = DataExpression::Output(OutputDataExpression::new( + QueryLocation::new_fake(), + OutputExpression::NamedSink(StringScalarExpression::new( + QueryLocation::new_fake(), + "test_out_port", + )), + )); + + assert_eq!(&expressions[0], &expected); + } + #[test] fn test_parse_set_operator_call() { let query = "set severity_text = \"ERROR\""; From 6424a52a6c2a8835d7e2aabdfc81d8925e164e77 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Tue, 13 Jan 2026 13:49:16 -0500 Subject: [PATCH 06/16] added tests for transform processor with route_to --- .../crates/engine/src/testing/processor.rs | 5 +- rust/otap-dataflow/crates/otap/Cargo.toml | 1 + .../crates/otap/src/transform_processor.rs | 111 ++++++++++++------ .../otap/src/transform_processor/config.rs | 19 ++- 4 files changed, 101 insertions(+), 35 deletions(-) diff --git a/rust/otap-dataflow/crates/engine/src/testing/processor.rs b/rust/otap-dataflow/crates/engine/src/testing/processor.rs index 0794e61387..2ddcd703d5 100644 --- a/rust/otap-dataflow/crates/engine/src/testing/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/testing/processor.rs @@ -118,6 +118,9 @@ impl ValidateContext { } } +/// The default outport port name that will be configured by the [`TestRuntime`] +pub const DEFAULT_OUT_PORT: &str = "out"; + /// A test runtime for simplifying processor tests. /// /// This structure encapsulates the common setup logic needed for testing processors, @@ -224,7 +227,7 @@ impl TestRuntime { // Set the output sender for the processor let _ = processor.set_pdata_sender( test_node(self.config().name.clone()), - "out".into(), + DEFAULT_OUT_PORT.into(), pdata_sender, ); diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index e58fc42bc6..3a2951a6cd 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -48,6 +48,7 @@ otap-df-engine = { path = "../engine" } otap-df-engine-macros = { path = "../engine-macros" } otap-df-channel = { path = "../channel" } otap-df-config = { path = "../config" } +otap-df-opl = { path = "../opl" } otap-df-pdata = { path = "../pdata" } otap-df-query-engine = { path = "../query-engine" } otap-df-telemetry = { path = "../telemetry" } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 6b797ea725..6d0a85c7a5 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -31,6 +31,7 @@ use otap_df_engine::{ node::NodeId, processor::ProcessorWrapper, }; +use otap_df_opl::parser::OplParser; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; use otap_df_query_engine::pipeline::{Pipeline, routing::Router, state::ExecutionState}; use otap_df_telemetry::metrics::MetricSet; @@ -38,7 +39,7 @@ use serde_json::Value; use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata, transform_processor::routing::RouterImpl}; -use self::config::Config; +use self::config::{Config, Language}; use self::metrics::Metrics; mod config; @@ -102,11 +103,16 @@ impl TransformProcessor { // TODO we should pass some context to the parser so we can determine if there are valid // identifiers when checking the config: // https://github.com/open-telemetry/otel-arrow/issues/1530 - let pipeline_expr = KqlParser::parse(&config.query) - .map_err(|e| ConfigError::InvalidUserConfig { - error: format!("Could not parse TransformProcessor query: {e:?}"), - })? - .pipeline; + let pipeline_expr = match config.language { + Language::KQL => KqlParser::parse(&config.query), + Language::OPL => OplParser::parse(&config.query), + } + .map_err(|e| ConfigError::InvalidUserConfig { + error: format!("Could not parse TransformProcessor query: {e:?}"), + })? + .pipeline; + + println!("{pipeline_expr:?}"); // TODO: it would be nice if we could validate that the pipeline expr is supported by the // query engine here. Currently, validation happens lazily when the first batch is seen. @@ -224,16 +230,19 @@ mod test { use super::*; use serde_json::json; - use otap_df_config::node::NodeUserConfig; + use otap_df_config::{PortName, node::NodeUserConfig}; use otap_df_engine::{ context::ControllerContext, + local::message::LocalSender, + message::Sender, + node::NodeWithPDataSender, testing::{ - processor::{TestContext, TestRuntime}, + processor::{DEFAULT_OUT_PORT, TestContext, TestRuntime}, test_node, }, }; use otap_df_pdata::{ - proto::{ + otap::Logs, proto::{ OtlpProtoMessage, opentelemetry::{ arrow::v1::ArrowPayloadType, @@ -243,20 +252,22 @@ mod test { resource::v1::Resource, trace::v1::{ResourceSpans, ScopeSpans, Span, TracesData}, }, - }, - testing::round_trip::{otap_to_otlp, otlp_to_otap}, + }, testing::round_trip::{otap_to_otlp, otlp_to_otap} }; use crate::pdata::OtapPdata; fn try_create_with_query( query: &str, + language: Language, runtime: &TestRuntime, ) -> Result, ConfigError> { let mut node_config = NodeUserConfig::new_processor_config(TRANSFORM_PROCESSOR_URN); node_config.config = json!({ - "query": query + "query": query, + "language": language.to_string() }); + node_config.default_out_port = Some(DEFAULT_OUT_PORT.into()); let telemetry_registry_handle = runtime.metrics_registry(); let controller_context = ControllerContext::new(telemetry_registry_handle); @@ -274,7 +285,7 @@ mod test { #[test] fn test_unparsable_query_is_config_time_error() { let runtime = TestRuntime::::new(); - match try_create_with_query("logs | invalid operator", &runtime) { + match try_create_with_query("logs | invalid operator", Language::KQL, &runtime) { Err(e) => { assert!( e.to_string() @@ -293,7 +304,8 @@ mod test { let telemetry_registry = runtime.metrics_registry(); let metrics_reporter = runtime.metrics_reporter(); let query = "logs | where severity_text == \"ERROR\""; - let processor = try_create_with_query(query, &runtime).expect("created processor"); + let processor = + try_create_with_query(query, Language::KQL, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -428,7 +440,8 @@ mod test { // test ensure it will only operate on traces, but ignores other signals let runtime = TestRuntime::::new(); let query = "traces | where name == \"foo\""; - let processor = try_create_with_query(query, &runtime).expect("created processor"); + let processor = + try_create_with_query(query, Language::KQL, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -459,42 +472,74 @@ mod test { } #[test] - fn test_signal_scope_all() { + fn test_simple_route_to() { // test ensure it will only operate on all signals let runtime = TestRuntime::::new(); - let query = "signals | where name == \"foo\""; - let processor = try_create_with_query(query, &runtime).expect("created processor"); + let query = "logs | route_to \"test_port\""; + let mut processor = + try_create_with_query(query, Language::OPL, &runtime).expect("created processor"); + + let test_node_id = NodeId { + index: 1, + name: "test_node".into(), + }; + let (test_port_tx, test_port_rx) = otap_df_channel::mpsc::Channel::new(10); + processor + .set_pdata_sender( + test_node_id, + PortName::from("test_port"), + Sender::Local(LocalSender::mpsc(test_port_tx)), + ) + .unwrap(); + + runtime .set_processor(processor) .run_test(|mut ctx| async move { - send_one_traces_one_metrics_same_names(&mut ctx).await; - let mut processed_pdata = ctx + let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { + resource_logs: vec![ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + vec![LogRecord::build().severity_text("ERROR").finish()], + )], + )], + })); + let pdata = OtapPdata::new_default(otap_batch.into()); + ctx.process(Message::PData(pdata)) + .await + .expect("no process error"); + + let out = ctx .drain_pdata() .await .into_iter() .map(OtapPdata::payload) .map(OtapArrowRecords::try_from) .map(Result::unwrap); - let traces_batch = processed_pdata.next().expect("sent traces batch"); - let metrics_batch = processed_pdata.next().expect("sent metrics batch"); + let result = out + .into_iter() + .next() + .expect("one result"); + // expect we got an empty batch: + // TODO assert on the context? + assert_eq!(result, OtapArrowRecords::Logs(Logs::default())); - // assert one of the spans got filtered out - let spans = traces_batch - .get(ArrowPayloadType::Spans) - .expect("spans present"); - assert_eq!(spans.num_rows(), 1); + let mut routed = Vec::new(); + while let Ok(msg) = test_port_rx.try_recv() { + routed.push(msg); + } + assert_eq!(routed.len(), 1); + // TODO assert on the routed message + // TODO assert on routed context + // TODO assert Ack/Nack - // assert it also filtered out one of the metrics - let metrics = metrics_batch - .get(ArrowPayloadType::UnivariateMetrics) - .expect("metrics present"); - assert_eq!(metrics.num_rows(), 1); }) .validate(|_ctx| async move {}) } #[test] fn test_pipeline_with_route_to() { - todo!() + todo!() } } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs index 45147198e3..8d3fb87cb2 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs @@ -8,5 +8,22 @@ use serde::Deserialize; pub struct Config { /// the query that defines the transformation to be applied pub query: String, - // TODO - add section to allow transforms to be specified in OTTL + + /// the language that defines the transformation to be applied + pub language: Language, +} + +#[derive(Debug, Deserialize)] +pub enum Language { + OPL, + KQL, +} + +impl ToString for Language { + fn to_string(&self) -> String { + match self { + Language::OPL => "OPL".to_string(), + Language::KQL => "KQL".to_string(), + } + } } From 5d1214c837fbbaf1ad03707ddbd5ca4657c8c346 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 08:51:19 -0500 Subject: [PATCH 07/16] implemented Router impl in trasnform processor --- .../crates/otap/src/transform_processor.rs | 67 +++++++++++++++---- .../otap/src/transform_processor/routing.rs | 32 ++++----- .../query-engine/src/pipeline/routing.rs | 45 +++++++++---- .../crates/query-engine/src/pipeline/state.rs | 6 +- 4 files changed, 100 insertions(+), 50 deletions(-) diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 6d0a85c7a5..b363424ff6 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -16,10 +16,11 @@ use std::sync::Arc; use async_trait::async_trait; +use base64::Engine; use data_engine_expressions::{Expression, PipelineExpression}; use data_engine_kql_parser::{KqlParser, Parser}; use linkme::distributed_slice; -use otap_df_config::{SignalType, error::Error as ConfigError, node::NodeUserConfig}; +use otap_df_config::{PortName, SignalType, error::Error as ConfigError, node::NodeUserConfig}; use otap_df_engine::{ ProcessorFactory, config::ProcessorConfig, @@ -33,11 +34,19 @@ use otap_df_engine::{ }; use otap_df_opl::parser::OplParser; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; -use otap_df_query_engine::pipeline::{Pipeline, routing::Router, state::ExecutionState}; +use otap_df_query_engine::pipeline::{ + Pipeline, + routing::{Router, RouterExtType}, + state::ExecutionState, +}; use otap_df_telemetry::metrics::MetricSet; use serde_json::Value; -use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata, transform_processor::routing::RouterImpl}; +use crate::{ + OTAP_PROCESSOR_FACTORIES, + pdata::{Context, OtapPdata}, + transform_processor::routing::RouterImpl, +}; use self::config::{Config, Language}; use self::metrics::Metrics; @@ -119,9 +128,7 @@ impl TransformProcessor { // https://github.com/open-telemetry/otel-arrow/issues/1634 let mut execution_state = ExecutionState::new(); - // TODO should `Box` be a type alias exposed by the routing module? - // so we don't accidentally set the wrong type here - execution_state.set_extension::>(Box::new(RouterImpl::new())); + execution_state.set_extension::(Box::new(RouterImpl::new())); Ok(Self { signal_scope: SignalScope::try_from(&pipeline_expr)?, @@ -138,6 +145,40 @@ impl TransformProcessor { SignalScope::Signal(signal_type) => signal_type == pdata.signal_type(), } } + + async fn handle_routed_messages( + &mut self, + effect_handler: &mut EffectHandler, + ) -> Result<(), EngineError> { + let router_impl = self + .execution_state + .get_extension_mut::() + .and_then(|router| router.as_any_mut().downcast_mut::()) + .unwrap(); // TODO no unwray + + for (route_name, otap_batch) in router_impl.routed.drain(..) { + let payload = OtapPayload::OtapArrowRecords(otap_batch); + // TODO -- need to properly handle Ack/Nack here by creating a new context, subscribing + // interests, and juggling the incoming/outgoing contexts/Ack/Nack messages correctly + let pdata = OtapPdata::new(Context::default(), payload); + + // Find the port name based on the route name. We need to since do this, rather + // than just passing route_name directly, b/c `send_message_to` expects a `PortName` + // which must be Into for TransformProcessor { { Ok(otap_batch) => { self.metrics.msgs_transformed.inc(); + self.handle_routed_messages(effect_handler).await?; otap_batch.into() } Err(e) => { @@ -242,7 +284,8 @@ mod test { }, }; use otap_df_pdata::{ - otap::Logs, proto::{ + otap::Logs, + proto::{ OtlpProtoMessage, opentelemetry::{ arrow::v1::ArrowPayloadType, @@ -252,7 +295,8 @@ mod test { resource::v1::Resource, trace::v1::{ResourceSpans, ScopeSpans, Span, TracesData}, }, - }, testing::round_trip::{otap_to_otlp, otlp_to_otap} + }, + testing::round_trip::{otap_to_otlp, otlp_to_otap}, }; use crate::pdata::OtapPdata; @@ -492,7 +536,6 @@ mod test { ) .unwrap(); - runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -517,10 +560,7 @@ mod test { .map(OtapPdata::payload) .map(OtapArrowRecords::try_from) .map(Result::unwrap); - let result = out - .into_iter() - .next() - .expect("one result"); + let result = out.into_iter().next().expect("one result"); // expect we got an empty batch: // TODO assert on the context? assert_eq!(result, OtapArrowRecords::Logs(Logs::default())); @@ -533,7 +573,6 @@ mod test { // TODO assert on the routed message // TODO assert on routed context // TODO assert Ack/Nack - }) .validate(|_ctx| async move {}) } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs index aeb0affb62..df5fdfe959 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs @@ -4,26 +4,25 @@ use std::any::Any; use async_trait::async_trait; -use otap_df_engine::local::processor::EffectHandler; -use otap_df_pdata::{OtapArrowRecords, OtapPayload}; +use otap_df_pdata::OtapArrowRecords; use otap_df_query_engine::{ - error::{Error, Result}, - pipeline::routing::Router, + error::Result, + pipeline::routing::{RouteName, Router}, }; -use crate::pdata::{Context, OtapPdata}; - -pub struct RouterImpl {} +/// implementation of [`Router`] used by [`TransformProcessor`] +pub(super) struct RouterImpl { + pub routed: Vec<(RouteName, OtapArrowRecords)>, +} impl RouterImpl { pub fn new() -> Self { - Self {} + Self { routed: Vec::new() } } } -#[async_trait] +#[async_trait(?Send)] impl Router for RouterImpl { - fn as_any(&self) -> &dyn Any { self } @@ -32,14 +31,9 @@ impl Router for RouterImpl { self } - async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { - todo!() - // // TODO this isn't the correct handling for context - // let pdata = OtapPdata::new( - // Context::default(), - // OtapPayload::OtapArrowRecords(otap_batch) - // ); - // self.effect_handler.send_message_to(route_name, pdata).await?; - // Ok(()) + async fn send(&mut self, route_name: RouteName, otap_batch: OtapArrowRecords) -> Result<()> { + self.routed.push((route_name, otap_batch)); + + Ok(()) } } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs index 2ddaf395b7..33a59b7452 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::any::Any; +use std::rc::Rc; use std::sync::Arc; use async_trait::async_trait; @@ -19,13 +20,13 @@ use crate::pipeline::state::ExecutionState; /// /// The pipeline stage that handles routing will look for an implementation of this trait /// in the `ExecutionState` extensions, and invoke it to send the data to the appropriate route. -/// +/// /// The trait also provides methods to allow down-casting to concrete implementations. This means /// the execution state can own the router as a trait object, but the pipeline caller can retrieve /// it, downcast it to a concrete type, and inspect any state it may have or call other methods. /// This is useful for implementations that buffer batches before sending them. /// -#[async_trait] +#[async_trait(?Send)] pub trait Router { /// returns a reference as `Any` for down-casting fn as_any(&self) -> &dyn Any; @@ -34,23 +35,35 @@ pub trait Router { fn as_any_mut(&mut self) -> &mut dyn Any; /// Send OTAP batch to the specified route. - async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()>; + async fn send(&mut self, route_name: RouteName, otap_batch: OtapArrowRecords) -> Result<()>; } -/// [`PipelineStage`] that routes OTAP batches to a specified route. -/// +/// Route name type +// `Rc` is used here for `Router` implementations that buffer batches to route, so they could own the +// route name without having to clone a string +pub type RouteName = Rc; + +/// Extension type for `Router` implementations used by this pipeline stage. +/// +/// For [`Pipeline`](super::Pipeline) callers that invoke pipeline supporting routing outputs, +/// this type is used to provide a reference to the `Router` implementation in the +/// `ExecutionState` extensions. +pub type RouterExtType = Box; + +/// [`PipelineStage`] that routes OTAP batches to a specified route. +/// /// This stage looks for a `Router` implementation in the `ExecutionState` extensions, /// and invokes it to send the batch to the specified route. [`Pipeline`s](super::Pipeline) /// that include this stage must ensure that a `Router` is set in the execution state. pub struct RouteToPipelineStage { - route_name: String, + route_name: RouteName, } impl RouteToPipelineStage { /// Create a new `RouteToPipelineStage` that routes to the specified route name. pub fn new(route_name: &str) -> Self { Self { - route_name: route_name.to_string(), + route_name: route_name.to_string().into(), } } } @@ -66,9 +79,9 @@ impl PipelineStage for RouteToPipelineStage { exec_state: &mut ExecutionState, ) -> Result { let root_payload_type = otap_batch.root_payload_type(); - match exec_state.get_extension_mut::>() { + match exec_state.get_extension_mut::() { Some(router) => { - router.send(&self.route_name, otap_batch).await?; + router.send(self.route_name.clone(), otap_batch).await?; } None => { return Err(Error::ExecutionError { @@ -99,10 +112,10 @@ mod test { use super::*; struct TestRouter { - routed: Vec<(String, OtapArrowRecords)>, + routed: Vec<(RouteName, OtapArrowRecords)>, } - #[async_trait] + #[async_trait(?Send)] impl Router for TestRouter { fn as_any(&self) -> &dyn Any { self @@ -112,8 +125,12 @@ mod test { self } - async fn send(&mut self, route_name: &str, otap_batch: OtapArrowRecords) -> Result<()> { - self.routed.push((route_name.to_string(), otap_batch)); + async fn send( + &mut self, + route_name: RouteName, + otap_batch: OtapArrowRecords, + ) -> Result<()> { + self.routed.push((route_name, otap_batch)); Ok(()) } } @@ -151,7 +168,7 @@ mod test { match router.as_any_mut().downcast_mut::() { Some(test_router) => { assert_eq!(test_router.routed.len(), 1); - assert_eq!(test_router.routed[0].0, "test_sink"); + assert_eq!(test_router.routed[0].0.as_str(), "test_sink"); } None => panic!("Failed to downcast router to TestRouter"), } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs index 0803badb26..f26eb6665d 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -6,15 +6,15 @@ use std::collections::HashMap; use std::hash::{BuildHasherDefault, Hasher}; /// Additional state that may be carried along during the execution of a pipeline. -/// +/// /// This can be used to store arbitrary extensions that may be needed by custom pipeline stages. /// Extensions are stored in a type-map and pipeline stages can retrieve them by a known type. -/// +/// // Note: this is similar to datafusion's `ExecutionState`, which it also uses for extensions, but // without the need for `Send + Sync` bounds, as those are not required in this context due to these // pipeline stages executing in a single threaded runtime. This also means that pipeline // stages can get mutable references to extensions if needed. -// +// // In the future, this may be expanded to include other execution-related state like metrics // // Also note that when the pipeline is executed without an ExecutionState being provided, a default From 735eed2504623cf9710a54d08413d0f1e482d979 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 11:08:05 -0500 Subject: [PATCH 08/16] code cleanup in data expression --- .../expressions/src/data_expressions.rs | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/rust/experimental/query_engine/expressions/src/data_expressions.rs b/rust/experimental/query_engine/expressions/src/data_expressions.rs index ae04b1d00d..6edbb64dc4 100644 --- a/rust/experimental/query_engine/expressions/src/data_expressions.rs +++ b/rust/experimental/query_engine/expressions/src/data_expressions.rs @@ -303,8 +303,7 @@ impl ConditionalDataExpressionBranch { } } - -/// TODO comments +/// Data expression representing an operation that emits data to a sink. #[derive(Debug, Clone, PartialEq)] pub struct OutputDataExpression { query_location: QueryLocation, @@ -323,10 +322,7 @@ impl OutputDataExpression { &self.output } - pub fn try_fold( - &mut self, - _scope: &PipelineResolutionScope, - ) -> Result<(), ExpressionError> { + pub fn try_fold(&mut self, _scope: &PipelineResolutionScope) -> Result<(), ExpressionError> { // No folding currently supported for output expressions. Ok(()) } @@ -341,18 +337,54 @@ impl Expression for OutputDataExpression { "OutputDataExpression" } - // TODO test this fn fmt_with_indent(&self, f: &mut std::fmt::Formatter<'_>, indent: &str) -> std::fmt::Result { writeln!(f, "Output:")?; write!(f, "{indent}└── ")?; match &self.output { - OutputExpression::NamedSink(expr) => expr.fmt_with_indent(f, format!("{indent} ").as_str()), + OutputExpression::NamedSink(expr) => { + expr.fmt_with_indent(f, format!("{indent} ").as_str()) + } } } } -/// TODO comments +/// Expression representing an operation that emits data to a sink. #[derive(Debug, Clone, PartialEq)] pub enum OutputExpression { - NamedSink(StringScalarExpression) + /// Output data to a sink identified by name. + // Currently this contains a static string because that is the only option supported. + // In the future could may support dynamic sink identified by a variable, result of a + // function call, or other some expression, at which point we could change this to the + // more general `StaticExpression`. + NamedSink(StringScalarExpression), +} + +#[cfg(test)] +mod test { + use super::*; + use std::fmt; + + // Helper struct to test fmt_with_indent by implementing Display + struct DisplayWrapper<'a, T: Expression>(&'a T, &'a str); + + impl<'a, T: Expression> fmt::Display for DisplayWrapper<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt_with_indent(f, self.1) + } + } + + #[test] + fn test_output_expression_fmt_with_indent() { + let string_expr = StringScalarExpression::new(QueryLocation::new_fake(), "sink_name"); + let output_expr = OutputExpression::NamedSink(string_expr.clone()); + let output_data_expr = OutputDataExpression::new(QueryLocation::new_fake(), output_expr); + let output = format!("{}", DisplayWrapper(&output_data_expr, "")); + assert_eq!( + output, + format!( + "Output:\n\ + └── {string_expr:?}\n" + ) + ); + } } From 3a44015f3f334751f45b5deaf2312c446a021373 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 11:45:53 -0500 Subject: [PATCH 09/16] more code cleanup --- .../expressions/src/data_expressions.rs | 8 +- .../crates/engine/src/testing/processor.rs | 7 +- .../crates/otap/src/transform_processor.rs | 138 ++++++++++++------ .../otap/src/transform_processor/config.rs | 24 +-- .../crates/query-engine/src/pipeline.rs | 21 ++- .../query-engine/src/pipeline/routing.rs | 6 + .../crates/query-engine/src/pipeline/state.rs | 2 + 7 files changed, 127 insertions(+), 79 deletions(-) diff --git a/rust/experimental/query_engine/expressions/src/data_expressions.rs b/rust/experimental/query_engine/expressions/src/data_expressions.rs index 6edbb64dc4..909df88af3 100644 --- a/rust/experimental/query_engine/expressions/src/data_expressions.rs +++ b/rust/experimental/query_engine/expressions/src/data_expressions.rs @@ -352,10 +352,10 @@ impl Expression for OutputDataExpression { #[derive(Debug, Clone, PartialEq)] pub enum OutputExpression { /// Output data to a sink identified by name. - // Currently this contains a static string because that is the only option supported. - // In the future could may support dynamic sink identified by a variable, result of a - // function call, or other some expression, at which point we could change this to the - // more general `StaticExpression`. + // Currently this contains a static string because it's the only way we handle identifying + // where to output the data. In the future we could support dynamic sink identified by a + // variable, result of a function call, or other some expression, at which point we can change + // this to contain the more general `StaticExpression`. NamedSink(StringScalarExpression), } diff --git a/rust/otap-dataflow/crates/engine/src/testing/processor.rs b/rust/otap-dataflow/crates/engine/src/testing/processor.rs index 2ddcd703d5..aeed47ec1b 100644 --- a/rust/otap-dataflow/crates/engine/src/testing/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/testing/processor.rs @@ -118,8 +118,9 @@ impl ValidateContext { } } -/// The default outport port name that will be configured by the [`TestRuntime`] -pub const DEFAULT_OUT_PORT: &str = "out"; +/// The name of the out_port that will be configured automatically on the [`ProcessorWrapper`] by +/// the [`TestRuntime`]. +pub const TEST_OUT_PORT_NAME: &str = "out"; /// A test runtime for simplifying processor tests. /// @@ -227,7 +228,7 @@ impl TestRuntime { // Set the output sender for the processor let _ = processor.set_pdata_sender( test_node(self.config().name.clone()), - DEFAULT_OUT_PORT.into(), + TEST_OUT_PORT_NAME.into(), pdata_sender, ); diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index b363424ff6..9e30fcb381 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -16,11 +16,10 @@ use std::sync::Arc; use async_trait::async_trait; -use base64::Engine; use data_engine_expressions::{Expression, PipelineExpression}; use data_engine_kql_parser::{KqlParser, Parser}; use linkme::distributed_slice; -use otap_df_config::{PortName, SignalType, error::Error as ConfigError, node::NodeUserConfig}; +use otap_df_config::{SignalType, error::Error as ConfigError, node::NodeUserConfig}; use otap_df_engine::{ ProcessorFactory, config::ProcessorConfig, @@ -34,11 +33,7 @@ use otap_df_engine::{ }; use otap_df_opl::parser::OplParser; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; -use otap_df_query_engine::pipeline::{ - Pipeline, - routing::{Router, RouterExtType}, - state::ExecutionState, -}; +use otap_df_query_engine::pipeline::{Pipeline, routing::RouterExtType, state::ExecutionState}; use otap_df_telemetry::metrics::MetricSet; use serde_json::Value; @@ -48,7 +43,7 @@ use crate::{ transform_processor::routing::RouterImpl, }; -use self::config::{Config, Language}; +use self::config::{Config, Query}; use self::metrics::Metrics; mod config; @@ -112,17 +107,15 @@ impl TransformProcessor { // TODO we should pass some context to the parser so we can determine if there are valid // identifiers when checking the config: // https://github.com/open-telemetry/otel-arrow/issues/1530 - let pipeline_expr = match config.language { - Language::KQL => KqlParser::parse(&config.query), - Language::OPL => OplParser::parse(&config.query), + let pipeline_expr = match &config.query { + Query::KqlQuery(query) => KqlParser::parse(query), + Query::OplQuery(query) => OplParser::parse(query), } .map_err(|e| ConfigError::InvalidUserConfig { error: format!("Could not parse TransformProcessor query: {e:?}"), })? .pipeline; - println!("{pipeline_expr:?}"); - // TODO: it would be nice if we could validate that the pipeline expr is supported by the // query engine here. Currently, validation happens lazily when the first batch is seen. // https://github.com/open-telemetry/otel-arrow/issues/1634 @@ -154,12 +147,17 @@ impl TransformProcessor { .execution_state .get_extension_mut::() .and_then(|router| router.as_any_mut().downcast_mut::()) - .unwrap(); // TODO no unwray + .ok_or_else(|| EngineError::ProcessorError { + processor: effect_handler.processor_id(), + kind: ProcessorErrorKind::Other, + source_detail: "Router not found in pipeline exec state".into(), + error: "Routing error:".into(), + })?; for (route_name, otap_batch) in router_impl.routed.drain(..) { let payload = OtapPayload::OtapArrowRecords(otap_batch); // TODO -- need to properly handle Ack/Nack here by creating a new context, subscribing - // interests, and juggling the incoming/outgoing contexts/Ack/Nack messages correctly + // interests, and juggling the incoming/outgoing contexts & Ack/Nack messages correctly let pdata = OtapPdata::new(Context::default(), payload); // Find the port name based on the route name. We need to since do this, rather @@ -169,12 +167,15 @@ impl TransformProcessor { .connected_ports() .iter() .find(|p| p.as_ref() == route_name.as_str()) - .unwrap() // TODO no unwrap + .ok_or_else(|| EngineError::ProcessorError { + processor: effect_handler.processor_id(), + kind: ProcessorErrorKind::Transport, + error: "Routing error: ".into(), + source_detail: format!("out_port name {} not configured", route_name), + })? .clone(); - effect_handler - .send_message_to(port_name, pdata) - .await - .unwrap(); // TODO no unwrap + + effect_handler.send_message_to(port_name, pdata).await?; } Ok(()) @@ -279,7 +280,7 @@ mod test { message::Sender, node::NodeWithPDataSender, testing::{ - processor::{DEFAULT_OUT_PORT, TestContext, TestRuntime}, + processor::{TEST_OUT_PORT_NAME, TestContext, TestRuntime}, test_node, }, }; @@ -301,17 +302,13 @@ mod test { use crate::pdata::OtapPdata; - fn try_create_with_query( - query: &str, - language: Language, + fn try_create_with_config( + config: Value, runtime: &TestRuntime, ) -> Result, ConfigError> { let mut node_config = NodeUserConfig::new_processor_config(TRANSFORM_PROCESSOR_URN); - node_config.config = json!({ - "query": query, - "language": language.to_string() - }); - node_config.default_out_port = Some(DEFAULT_OUT_PORT.into()); + node_config.config = config; + node_config.default_out_port = Some(TEST_OUT_PORT_NAME.into()); let telemetry_registry_handle = runtime.metrics_registry(); let controller_context = ControllerContext::new(telemetry_registry_handle); @@ -326,10 +323,24 @@ mod test { ) } + fn try_create_with_kql_query( + query: &str, + runtime: &TestRuntime, + ) -> Result, ConfigError> { + try_create_with_config(json!({ "kql_query": query }), runtime) + } + + fn try_create_with_opl_query( + query: &str, + runtime: &TestRuntime, + ) -> Result, ConfigError> { + try_create_with_config(json!({ "opl_query": query }), runtime) + } + #[test] fn test_unparsable_query_is_config_time_error() { let runtime = TestRuntime::::new(); - match try_create_with_query("logs | invalid operator", Language::KQL, &runtime) { + match try_create_with_kql_query("logs | invalid operator", &runtime) { Err(e) => { assert!( e.to_string() @@ -348,8 +359,7 @@ mod test { let telemetry_registry = runtime.metrics_registry(); let metrics_reporter = runtime.metrics_reporter(); let query = "logs | where severity_text == \"ERROR\""; - let processor = - try_create_with_query(query, Language::KQL, &runtime).expect("created processor"); + let processor = try_create_with_kql_query(query, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -484,8 +494,7 @@ mod test { // test ensure it will only operate on traces, but ignores other signals let runtime = TestRuntime::::new(); let query = "traces | where name == \"foo\""; - let processor = - try_create_with_query(query, Language::KQL, &runtime).expect("created processor"); + let processor = try_create_with_kql_query(query, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -515,13 +524,47 @@ mod test { .validate(|_ctx| async move {}) } + #[test] + fn test_signal_scope_all() { + // test ensure it will only operate on all signals + let runtime = TestRuntime::::new(); + let query = "signals | where name == \"foo\""; + let processor = try_create_with_kql_query(query, &runtime).expect("created processor"); + runtime + .set_processor(processor) + .run_test(|mut ctx| async move { + send_one_traces_one_metrics_same_names(&mut ctx).await; + let mut processed_pdata = ctx + .drain_pdata() + .await + .into_iter() + .map(OtapPdata::payload) + .map(OtapArrowRecords::try_from) + .map(Result::unwrap); + let traces_batch = processed_pdata.next().expect("sent traces batch"); + let metrics_batch = processed_pdata.next().expect("sent metrics batch"); + + // assert one of the spans got filtered out + let spans = traces_batch + .get(ArrowPayloadType::Spans) + .expect("spans present"); + assert_eq!(spans.num_rows(), 1); + + // assert it also filtered out one of the metrics + let metrics = metrics_batch + .get(ArrowPayloadType::UnivariateMetrics) + .expect("metrics present"); + assert_eq!(metrics.num_rows(), 1); + }) + .validate(|_ctx| async move {}) + } + #[test] fn test_simple_route_to() { // test ensure it will only operate on all signals let runtime = TestRuntime::::new(); let query = "logs | route_to \"test_port\""; - let mut processor = - try_create_with_query(query, Language::OPL, &runtime).expect("created processor"); + let mut processor = try_create_with_opl_query(query, &runtime).expect("created processor"); let test_node_id = NodeId { index: 1, @@ -539,7 +582,7 @@ mod test { runtime .set_processor(processor) .run_test(|mut ctx| async move { - let otap_batch = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { + let input = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { resource_logs: vec![ResourceLogs::new( Resource::default(), vec![ScopeLogs::new( @@ -548,7 +591,7 @@ mod test { )], )], })); - let pdata = OtapPdata::new_default(otap_batch.into()); + let pdata = OtapPdata::new_default(input.clone().into()); ctx.process(Message::PData(pdata)) .await .expect("no process error"); @@ -561,24 +604,25 @@ mod test { .map(OtapArrowRecords::try_from) .map(Result::unwrap); let result = out.into_iter().next().expect("one result"); + // expect we got an empty batch: - // TODO assert on the context? assert_eq!(result, OtapArrowRecords::Logs(Logs::default())); + // TODO when we support Ack/Nack here assert on the context of this message let mut routed = Vec::new(); while let Ok(msg) = test_port_rx.try_recv() { routed.push(msg); } assert_eq!(routed.len(), 1); - // TODO assert on the routed message - // TODO assert on routed context - // TODO assert Ack/Nack + let (_context, payload) = routed.pop().unwrap().into_parts(); + match payload { + OtapPayload::OtapArrowRecords(result) => { + assert_eq!(result, input) + } + _ => panic!("unexpected payload type"), + } + // TODO when we support Ack/Nack here assert on routed context }) .validate(|_ctx| async move {}) } - - #[test] - fn test_pipeline_with_route_to() { - todo!() - } } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs index 8d3fb87cb2..3742439518 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs @@ -6,24 +6,14 @@ use serde::Deserialize; /// Configuration for the [`TransformProcessor`](super::TransformProcessor) #[derive(Debug, Deserialize)] pub struct Config { - /// the query that defines the transformation to be applied - pub query: String, - - /// the language that defines the transformation to be applied - pub language: Language, + #[serde(flatten)] + pub query: Query, } #[derive(Debug, Deserialize)] -pub enum Language { - OPL, - KQL, -} - -impl ToString for Language { - fn to_string(&self) -> String { - match self { - Language::OPL => "OPL".to_string(), - Language::KQL => "KQL".to_string(), - } - } +#[serde(rename_all = "snake_case")] +pub enum Query { + KqlQuery(String), + OplQuery(String), + // TODO - add section to allow transforms to be specified in OTTL } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs index e3ac1f28a1..17b92f4d1d 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs @@ -29,9 +29,7 @@ mod filter; mod functions; mod planner; -/// TODO docs pub mod routing; -/// TODO docs pub mod state; /// A stage in the pipeline. @@ -206,11 +204,6 @@ impl Pipeline { /// Execute the pipeline on a batch of telemetry data. /// - /// Any query planning happens during the first call to execute, including setting up any - /// DataFusion SessionContext, TaskContext, etc. Subsequent calls will not have to redo - /// the full planning, although individual stages may do light re-plannings to adapt to - /// changing OTAP batch schemas. - /// /// # Arguments /// - `otap_batch`: The input telemetry data to process /// @@ -221,7 +214,19 @@ impl Pipeline { self.execute_with_state(otap_batch, &mut exec_state).await } - /// TODO comments + /// Execute the pipeline on a batch of telemetry data, using the provided execution state. + /// + /// Any query planning happens during the first call to execute, including setting up any + /// DataFusion SessionContext, TaskContext, etc. Subsequent calls will not have to redo + /// the full planning, although individual stages may do light re-plannings to adapt to + /// changing OTAP batch schemas. + /// + /// # Arguments + /// - `otap_batch`: The input telemetry data to process + /// - `exec_state`: The execution state to use for the pipeline execution + /// + /// # Returns + /// The transformed telemetry data after all stages have executed pub async fn execute_with_state( &mut self, mut otap_batch: OtapArrowRecords, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs index 33a59b7452..1a425fee4b 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -1,6 +1,12 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//! Routing for columnar query engine. +//! +//! This module contains the pipeline stage that OTAP batches to some destination. +//! The routing implementation is customizable, using the `Router` trait which the +//! pipeline stage implementation will use to send the data to the appropriate route. + use std::any::Any; use std::rc::Rc; use std::sync::Arc; diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs index f26eb6665d..3e27fb2be7 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -1,6 +1,8 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//! Execution state for columnar query engine. + use std::any::{Any, TypeId}; use std::collections::HashMap; use std::hash::{BuildHasherDefault, Hasher}; From be1237a1612c4614352108c18da864f2b2a8e737 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 11:47:35 -0500 Subject: [PATCH 10/16] fix comments in state --- rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs index 3e27fb2be7..e20bb2440a 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -24,7 +24,6 @@ use std::hash::{BuildHasherDefault, Hasher}; // to initialize in the default case. #[derive(Default)] pub struct ExecutionState { - // TODO comments extensions: Option, } From 58396cd48b8adc33feeac3661a24788040e356ec Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 13:06:29 -0500 Subject: [PATCH 11/16] cleanup code --- .../crates/otap/src/transform_processor.rs | 4 +- .../query-engine/src/pipeline/routing.rs | 14 +-- .../crates/query-engine/src/pipeline/state.rs | 107 ++++++++++++++++-- 3 files changed, 105 insertions(+), 20 deletions(-) diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 9e30fcb381..0c02e8519c 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -160,9 +160,7 @@ impl TransformProcessor { // interests, and juggling the incoming/outgoing contexts & Ack/Nack messages correctly let pdata = OtapPdata::new(Context::default(), payload); - // Find the port name based on the route name. We need to since do this, rather - // than just passing route_name directly, b/c `send_message_to` expects a `PortName` - // which must be Into; /// Extension type for `Router` implementations used by this pipeline stage. /// -/// For [`Pipeline`](super::Pipeline) callers that invoke pipeline supporting routing outputs, -/// this type is used to provide a reference to the `Router` implementation in the -/// `ExecutionState` extensions. +/// For callers that invoke pipeline supporting routing outputs, this type is used to provide a reference +/// to the `Router` implementation in the`ExecutionState` extensions. pub type RouterExtType = Box; /// [`PipelineStage`] that routes OTAP batches to a specified route. /// -/// This stage looks for a `Router` implementation in the `ExecutionState` extensions, -/// and invokes it to send the batch to the specified route. [`Pipeline`s](super::Pipeline) -/// that include this stage must ensure that a `Router` is set in the execution state. +/// This stage looks for a `Router` implementation in the `ExecutionState` extensions, and invokes +/// it to send the batch to the specified route. Executors of [`Pipeline`s](super::Pipeline) that +/// include this stage must ensure that a `Router` is set in the execution state. pub struct RouteToPipelineStage { route_name: RouteName, } impl RouteToPipelineStage { /// Create a new `RouteToPipelineStage` that routes to the specified route name. + #[must_use] pub fn new(route_name: &str) -> Self { Self { route_name: route_name.to_string().into(), diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs index e20bb2440a..810c7e38b1 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -12,16 +12,17 @@ use std::hash::{BuildHasherDefault, Hasher}; /// This can be used to store arbitrary extensions that may be needed by custom pipeline stages. /// Extensions are stored in a type-map and pipeline stages can retrieve them by a known type. /// -// Note: this is similar to datafusion's `ExecutionState`, which it also uses for extensions, but -// without the need for `Send + Sync` bounds, as those are not required in this context due to these -// pipeline stages executing in a single threaded runtime. This also means that pipeline +/// This is similar to datafusion's `ExecutionState`, which it also uses for extensions, but +/// without the need for `Send + Sync` bounds, as those are not required in this context due to these +/// pipeline stages executing in a single threaded runtime. This also means that pipeline // stages can get mutable references to extensions if needed. // -// In the future, this may be expanded to include other execution-related state like metrics +// In the future, this may be expanded to include other execution-related state like metrics,or +// other state needed for stateful stream processing. // -// Also note that when the pipeline is executed without an ExecutionState being provided, a default -// one will be created. This means that anything added to this in the future should be inexpensive -// to initialize in the default case. +// When the pipeline is executed without an ExecutionState being provided, a default one will be +// created. This means that anything added to this in the should be inexpensive to initialize in +// the default case. #[derive(Default)] pub struct ExecutionState { extensions: Option, @@ -29,11 +30,13 @@ pub struct ExecutionState { impl ExecutionState { /// Create new execution options. + #[must_use] pub fn new() -> Self { Self { extensions: None } } /// Get extension of type T, if it exists. + #[must_use] pub fn get_extension(&self) -> Option<&T> { self.extensions.as_ref().and_then(|map| { map.get(&TypeId::of::()) @@ -42,6 +45,7 @@ impl ExecutionState { } /// Get mutable extension of type T, if it exists. + #[must_use] pub fn get_extension_mut(&mut self) -> Option<&mut T> { self.extensions.as_mut().and_then(|map| { map.get_mut(&TypeId::of::()) @@ -51,9 +55,7 @@ impl ExecutionState { /// Set extension of type T. pub fn set_extension(&mut self, value: T) { - let map = self - .extensions - .get_or_insert_with(|| ExtensionMap::default()); + let map = self.extensions.get_or_insert_with(ExtensionMap::default); _ = map.insert(TypeId::of::(), Box::new(value)); } } @@ -91,3 +93,88 @@ impl Hasher for IdHasher { self.0 } } + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq)] + struct TestExtension { + value: i32, + } + + #[derive(Debug, PartialEq)] + struct AnotherExtension { + name: String, + } + + #[test] + fn test_set_and_get_extension() { + let mut state = ExecutionState::new(); + + // Initially, extension should not exist + assert!(state.get_extension::().is_none()); + + // Set an extension + state.set_extension(TestExtension { value: 42 }); + + // Now we should be able to get it + let ext = state.get_extension::(); + assert!(ext.is_some()); + assert_eq!(ext.unwrap().value, 42); + } + + #[test] + fn test_get_extension_mut() { + let mut state = ExecutionState::new(); + + // Set an extension + state.set_extension(TestExtension { value: 10 }); + + // Get mutable reference and modify it + { + let ext = state.get_extension_mut::(); + assert!(ext.is_some()); + ext.unwrap().value = 20; + } + + // Verify the modification + let ext = state.get_extension::(); + assert_eq!(ext.unwrap().value, 20); + } + + #[test] + fn test_multiple_extensions() { + let mut state = ExecutionState::new(); + + // Set multiple different extensions + state.set_extension(TestExtension { value: 100 }); + state.set_extension(AnotherExtension { + name: "test".to_string(), + }); + + // Both should be retrievable independently + let test_ext = state.get_extension::(); + assert!(test_ext.is_some()); + assert_eq!(test_ext.unwrap().value, 100); + + let another_ext = state.get_extension::(); + assert!(another_ext.is_some()); + assert_eq!(another_ext.unwrap().name, "test"); + } + + #[test] + fn test_overwrite_extension() { + let mut state = ExecutionState::new(); + + // Set an extension + state.set_extension(TestExtension { value: 1 }); + + // Overwrite it + state.set_extension(TestExtension { value: 2 }); + + // Should have the new value + let ext = state.get_extension::(); + assert_eq!(ext.unwrap().value, 2); + } +} From 92671126f99bb9edb074ab54af76dc0360b77021 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 13:38:19 -0500 Subject: [PATCH 12/16] add test for conditional routing --- .../crates/otap/src/transform_processor.rs | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 0c02e8519c..172c65f97b 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -623,4 +623,116 @@ mod test { }) .validate(|_ctx| async move {}) } + + #[test] + fn test_conditional_route_to() { + // test ensure it will only operate on all signals + let runtime = TestRuntime::::new(); + let query = r#"logs + | if (severity_text == "ERROR") { + route_to "error_port" + } else if (severity_text == "INFO") { + route_to "info_port" + }"#; + let mut processor = try_create_with_opl_query(query, &runtime).expect("created processor"); + + let test_node_id = NodeId { + index: 1, + name: "test_node".into(), + }; + let (error_port_tx, error_port_rx) = otap_df_channel::mpsc::Channel::new(10); + processor + .set_pdata_sender( + test_node_id.clone(), + PortName::from("error_port"), + Sender::Local(LocalSender::mpsc(error_port_tx)), + ) + .unwrap(); + let (info_port_tx, info_port_rx) = otap_df_channel::mpsc::Channel::new(10); + processor + .set_pdata_sender( + test_node_id.clone(), + PortName::from("info_port"), + Sender::Local(LocalSender::mpsc(info_port_tx)), + ) + .unwrap(); + + fn assert_logs_records_equal(otap_batch: OtapArrowRecords, log_record: LogRecord) { + let result = otap_to_otlp(&otap_batch); + match result { + OtlpProtoMessage::Logs(logs) => { + assert_eq!( + &logs.resource_logs[0].scope_logs[0].log_records, + &[log_record] + ) + } + _ => panic!("unexpected result"), + } + } + + runtime + .set_processor(processor) + .run_test(|mut ctx| async move { + let error_log_record = LogRecord::build().severity_text("ERROR").finish(); + let info_log_record = LogRecord::build().severity_text("INFO").finish(); + let other_log_record = LogRecord::build().severity_text("DEBUG").finish(); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { + resource_logs: vec![ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + vec![ + error_log_record.clone(), + info_log_record.clone(), + other_log_record.clone(), + ], + )], + )], + })); + let pdata = OtapPdata::new_default(input.clone().into()); + ctx.process(Message::PData(pdata)) + .await + .expect("no process error"); + + // check anything not routed get outputted to the default port + let out = ctx + .drain_pdata() + .await + .into_iter() + .map(OtapPdata::payload) + .map(OtapArrowRecords::try_from) + .map(Result::unwrap); + let default_result = out.into_iter().next().expect("one result"); + assert_logs_records_equal(default_result, other_log_record); + + // check error log record got routed to correct out pot + let mut routed = Vec::new(); + while let Ok(msg) = error_port_rx.try_recv() { + routed.push(msg); + } + assert_eq!(routed.len(), 1); + let (_context, payload) = routed.pop().unwrap().into_parts(); + match payload { + OtapPayload::OtapArrowRecords(result) => { + assert_logs_records_equal(result, error_log_record); + } + _ => panic!("unexpected payload type"), + } + + // check error log record got routed to correct out pot + let mut routed = Vec::new(); + while let Ok(msg) = info_port_rx.try_recv() { + routed.push(msg); + } + assert_eq!(routed.len(), 1); + let (_context, payload) = routed.pop().unwrap().into_parts(); + match payload { + OtapPayload::OtapArrowRecords(result) => { + assert_logs_records_equal(result, info_log_record); + } + _ => panic!("unexpected payload type"), + } + }) + .validate(|_ctx| async move {}) + } } From d2368a9848c8fdb13e6d49a8f709a1a14d85d7ed Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 13:49:12 -0500 Subject: [PATCH 13/16] fmt in opl --- .../crates/opl/src/parser/operator.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/rust/otap-dataflow/crates/opl/src/parser/operator.rs b/rust/otap-dataflow/crates/opl/src/parser/operator.rs index 63bbb64bda..305d31564d 100644 --- a/rust/otap-dataflow/crates/opl/src/parser/operator.rs +++ b/rust/otap-dataflow/crates/opl/src/parser/operator.rs @@ -3,10 +3,13 @@ use data_engine_expressions::{ ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression, - DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression, QueryLocation, - TransformExpression, OutputDataExpression, OutputExpression, StaticScalarExpression, + DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression, + OutputDataExpression, OutputExpression, QueryLocation, StaticScalarExpression, + TransformExpression, +}; +use data_engine_parser_abstractions::{ + ParserError, parse_standard_string_literal, to_query_location, }; -use data_engine_parser_abstractions::{ParserError, parse_standard_string_literal, to_query_location}; use pest::iterators::Pair; use crate::parser::assignment::parse_assignment_expression; @@ -247,9 +250,9 @@ mod tests { use data_engine_expressions::{ ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression, DiscardDataExpression, EqualToLogicalExpression, LogicalExpression, MutableValueExpression, - NotLogicalExpression, OutputDataExpression, OutputExpression, QueryLocation, ScalarExpression, SetTransformExpression, - SourceScalarExpression, StaticScalarExpression, StringScalarExpression, - TransformExpression, ValueAccessor, + NotLogicalExpression, OutputDataExpression, OutputExpression, QueryLocation, + ScalarExpression, SetTransformExpression, SourceScalarExpression, StaticScalarExpression, + StringScalarExpression, TransformExpression, ValueAccessor, }; use data_engine_parser_abstractions::{Parser, ParserOptions, ParserState}; use pest::Parser as _; From 8f3aac89dee454ed79ea007a0a6b0f43f8745023 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 17:40:22 -0500 Subject: [PATCH 14/16] Update rust/experimental/query_engine/expressions/src/data_expressions.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Laurent Quérel --- .../query_engine/expressions/src/data_expressions.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/experimental/query_engine/expressions/src/data_expressions.rs b/rust/experimental/query_engine/expressions/src/data_expressions.rs index 909df88af3..44f0ebaff2 100644 --- a/rust/experimental/query_engine/expressions/src/data_expressions.rs +++ b/rust/experimental/query_engine/expressions/src/data_expressions.rs @@ -352,10 +352,10 @@ impl Expression for OutputDataExpression { #[derive(Debug, Clone, PartialEq)] pub enum OutputExpression { /// Output data to a sink identified by name. - // Currently this contains a static string because it's the only way we handle identifying - // where to output the data. In the future we could support dynamic sink identified by a - // variable, result of a function call, or other some expression, at which point we can change - // this to contain the more general `StaticExpression`. + /// Currently this contains a static string because it's the only way we handle identifying + /// where to output the data. In the future we could support dynamic sink identified by a + /// variable, result of a function call, or other some expression, at which point we can change + /// this to contain the more general `StaticExpression`. NamedSink(StringScalarExpression), } From 3aea290e2fb0fea813114311978b942bd5360ca2 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 17:42:51 -0500 Subject: [PATCH 15/16] Fix bad grammar in docs in state.rs --- rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs index 810c7e38b1..40011d9bb1 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -21,8 +21,8 @@ use std::hash::{BuildHasherDefault, Hasher}; // other state needed for stateful stream processing. // // When the pipeline is executed without an ExecutionState being provided, a default one will be -// created. This means that anything added to this in the should be inexpensive to initialize in -// the default case. +// created. This means that anything added to this should be inexpensive to initialize in the +// default case. #[derive(Default)] pub struct ExecutionState { extensions: Option, From 16446e42bd6c26ed54f4d34be798a2f6fe3bb6b4 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Wed, 14 Jan 2026 17:55:34 -0500 Subject: [PATCH 16/16] avoid caching routed messages when pipeline fails --- rust/otap-dataflow/crates/otap/src/transform_processor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 172c65f97b..9affaf39a3 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -245,6 +245,10 @@ impl Processor for TransformProcessor { otap_batch.into() } Err(e) => { + // forward the routed messages in the event of a failure to avoid + // caching any batches that were routed before the pipeline fails + self.handle_routed_messages(effect_handler).await?; + self.metrics.msgs_transform_failed.inc(); return Err(EngineError::ProcessorError { processor: effect_handler.processor_id(),