diff --git a/rust/experimental/query_engine/engine-recordset/src/engine.rs b/rust/experimental/query_engine/engine-recordset/src/engine.rs index 47ebd513be..a5267cc874 100644 --- a/rust/experimental/query_engine/engine-recordset/src/engine.rs +++ b/rust/experimental/query_engine/engine-recordset/src/engine.rs @@ -316,6 +316,14 @@ fn process_record<'a, TRecord: Record + 'static>( ); break; } + DataExpression::Output(c) => { + execution_context.add_diagnostic_if_enabled( + RecordSetEngineDiagnosticLevel::Error, + c, + || "Output Expression not yet supported in record set engine".into(), + ); + break; + } } } diff --git a/rust/experimental/query_engine/expressions/src/data_expressions.rs b/rust/experimental/query_engine/expressions/src/data_expressions.rs index fdd1606786..44f0ebaff2 100644 --- a/rust/experimental/query_engine/expressions/src/data_expressions.rs +++ b/rust/experimental/query_engine/expressions/src/data_expressions.rs @@ -16,6 +16,9 @@ pub enum DataExpression { /// Conditional data expression. Conditional(ConditionalDataExpression), + + /// Output data expression + Output(OutputDataExpression), } impl DataExpression { @@ -28,6 +31,7 @@ impl DataExpression { DataExpression::Summary(s) => s.try_fold(scope), DataExpression::Transform(t) => t.try_fold(scope), DataExpression::Conditional(c) => c.try_fold(scope), + DataExpression::Output(o) => o.try_fold(scope), } } } @@ -39,6 +43,7 @@ impl Expression for DataExpression { DataExpression::Summary(s) => s.get_query_location(), DataExpression::Transform(t) => t.get_query_location(), DataExpression::Conditional(c) => c.get_query_location(), + DataExpression::Output(o) => o.get_query_location(), } } @@ -48,6 +53,7 @@ impl Expression for DataExpression { DataExpression::Summary(_) => "DataExpression(Summary)", DataExpression::Transform(_) => "DataExpression(Transform)", DataExpression::Conditional(_) => "DataExpression(Conditional)", + DataExpression::Output(_) => "DataExpression(Output)", } } @@ -57,6 +63,7 @@ impl Expression for DataExpression { DataExpression::Summary(s) => s.fmt_with_indent(f, indent), DataExpression::Transform(t) => t.fmt_with_indent(f, indent), DataExpression::Conditional(c) => c.fmt_with_indent(f, indent), + DataExpression::Output(o) => o.fmt_with_indent(f, indent), } } } @@ -295,3 +302,89 @@ impl ConditionalDataExpressionBranch { &self.expressions } } + +/// Data expression representing an operation that emits data to a sink. +#[derive(Debug, Clone, PartialEq)] +pub struct OutputDataExpression { + query_location: QueryLocation, + output: OutputExpression, +} + +impl OutputDataExpression { + pub fn new(query_location: QueryLocation, output: OutputExpression) -> Self { + Self { + query_location, + output, + } + } + + pub fn get_output(&self) -> &OutputExpression { + &self.output + } + + pub fn try_fold(&mut self, _scope: &PipelineResolutionScope) -> Result<(), ExpressionError> { + // No folding currently supported for output expressions. + Ok(()) + } +} + +impl Expression for OutputDataExpression { + fn get_query_location(&self) -> &QueryLocation { + &self.query_location + } + + fn get_name(&self) -> &'static str { + "OutputDataExpression" + } + + fn fmt_with_indent(&self, f: &mut std::fmt::Formatter<'_>, indent: &str) -> std::fmt::Result { + writeln!(f, "Output:")?; + write!(f, "{indent}└── ")?; + match &self.output { + OutputExpression::NamedSink(expr) => { + expr.fmt_with_indent(f, format!("{indent} ").as_str()) + } + } + } +} + +/// Expression representing an operation that emits data to a sink. +#[derive(Debug, Clone, PartialEq)] +pub enum OutputExpression { + /// Output data to a sink identified by name. + /// Currently this contains a static string because it's the only way we handle identifying + /// where to output the data. In the future we could support dynamic sink identified by a + /// variable, result of a function call, or other some expression, at which point we can change + /// this to contain the more general `StaticExpression`. + NamedSink(StringScalarExpression), +} + +#[cfg(test)] +mod test { + use super::*; + use std::fmt; + + // Helper struct to test fmt_with_indent by implementing Display + struct DisplayWrapper<'a, T: Expression>(&'a T, &'a str); + + impl<'a, T: Expression> fmt::Display for DisplayWrapper<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt_with_indent(f, self.1) + } + } + + #[test] + fn test_output_expression_fmt_with_indent() { + let string_expr = StringScalarExpression::new(QueryLocation::new_fake(), "sink_name"); + let output_expr = OutputExpression::NamedSink(string_expr.clone()); + let output_data_expr = OutputDataExpression::new(QueryLocation::new_fake(), output_expr); + let output = format!("{}", DisplayWrapper(&output_data_expr, "")); + assert_eq!( + output, + format!( + "Output:\n\ + └── {string_expr:?}\n" + ) + ); + } +} diff --git a/rust/otap-dataflow/crates/engine/src/control.rs b/rust/otap-dataflow/crates/engine/src/control.rs index 1807c15f06..52bae90eaa 100644 --- a/rust/otap-dataflow/crates/engine/src/control.rs +++ b/rust/otap-dataflow/crates/engine/src/control.rs @@ -57,7 +57,7 @@ impl From for f64 { } } -/// Standard context values hold three caller-specified fields. The +/// Standard context values hold three caller-specified fields. The /// size is arbitrary, but shouldn't be larger than needed by /// callers. For example: retry count, sequence and generation /// numbers, deadline, num_items, etc. diff --git a/rust/otap-dataflow/crates/engine/src/testing/processor.rs b/rust/otap-dataflow/crates/engine/src/testing/processor.rs index 0794e61387..aeed47ec1b 100644 --- a/rust/otap-dataflow/crates/engine/src/testing/processor.rs +++ b/rust/otap-dataflow/crates/engine/src/testing/processor.rs @@ -118,6 +118,10 @@ impl ValidateContext { } } +/// The name of the out_port that will be configured automatically on the [`ProcessorWrapper`] by +/// the [`TestRuntime`]. +pub const TEST_OUT_PORT_NAME: &str = "out"; + /// A test runtime for simplifying processor tests. /// /// This structure encapsulates the common setup logic needed for testing processors, @@ -224,7 +228,7 @@ impl TestRuntime { // Set the output sender for the processor let _ = processor.set_pdata_sender( test_node(self.config().name.clone()), - "out".into(), + TEST_OUT_PORT_NAME.into(), pdata_sender, ); diff --git a/rust/otap-dataflow/crates/opl/src/opl.pest b/rust/otap-dataflow/crates/opl/src/opl.pest index f97cff4197..34d7ef8cf2 100644 --- a/rust/otap-dataflow/crates/opl/src/opl.pest +++ b/rust/otap-dataflow/crates/opl/src/opl.pest @@ -164,9 +164,14 @@ where_operator_call = { "where" ~ expression } +route_to_operator_call = { + "route_to" ~ string_literal +} + operator_call = { set_operator_call | if_else_operator_call + | route_to_operator_call | where_operator_call } diff --git a/rust/otap-dataflow/crates/opl/src/parser/operator.rs b/rust/otap-dataflow/crates/opl/src/parser/operator.rs index 5d0e86265c..305d31564d 100644 --- a/rust/otap-dataflow/crates/opl/src/parser/operator.rs +++ b/rust/otap-dataflow/crates/opl/src/parser/operator.rs @@ -3,10 +3,13 @@ use data_engine_expressions::{ ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression, - DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression, QueryLocation, + DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression, + OutputDataExpression, OutputExpression, QueryLocation, StaticScalarExpression, TransformExpression, }; -use data_engine_parser_abstractions::{ParserError, to_query_location}; +use data_engine_parser_abstractions::{ + ParserError, parse_standard_string_literal, to_query_location, +}; use pest::iterators::Pair; use crate::parser::assignment::parse_assignment_expression; @@ -20,8 +23,9 @@ pub(crate) fn parse_operator_call( ) -> Result<(), ParserError> { for rule in rule.into_inner() { match rule.as_rule() { - Rule::set_operator_call => parse_set_operator_call(rule, pipeline_builder)?, Rule::if_else_operator_call => parse_if_else_opeartor_call(rule, pipeline_builder)?, + Rule::route_to_operator_call => parse_route_to_operator_call(rule, pipeline_builder)?, + Rule::set_operator_call => parse_set_operator_call(rule, pipeline_builder)?, Rule::where_operator_call => parse_where_operator_call(rule, pipeline_builder)?, invalid_rule => { let query_location = to_query_location(&rule); @@ -37,6 +41,41 @@ pub(crate) fn parse_operator_call( Ok(()) } +pub(crate) fn parse_route_to_operator_call( + operator_call_rule: Pair<'_, Rule>, + pipeline_builder: &mut dyn PipelineBuilder, +) -> Result<(), ParserError> { + let query_location = to_query_location(&operator_call_rule); + if let Some(rule) = operator_call_rule.into_inner().next() { + let rule_query_location = to_query_location(&rule); + let dest = match rule.as_rule() { + Rule::string_literal => match parse_standard_string_literal(rule) { + StaticScalarExpression::String(string) => string, + invalid_expr => { + return Err(ParserError::SyntaxError( + rule_query_location, + format!("Expected static string literal, found {:?}", invalid_expr), + )); + } + }, + invalid_rule => { + let query_location = to_query_location(&rule); + return Err(invalid_child_rule_error( + query_location, + Rule::string_literal, + invalid_rule, + )); + } + }; + + let output_expr = + OutputDataExpression::new(query_location, OutputExpression::NamedSink(dest)); + pipeline_builder.push_data_expression(DataExpression::Output(output_expr)); + } + + Ok(()) +} + pub(crate) fn parse_set_operator_call( operator_call_rule: Pair<'_, Rule>, pipeline_builder: &mut dyn PipelineBuilder, @@ -211,9 +250,9 @@ mod tests { use data_engine_expressions::{ ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression, DiscardDataExpression, EqualToLogicalExpression, LogicalExpression, MutableValueExpression, - NotLogicalExpression, QueryLocation, ScalarExpression, SetTransformExpression, - SourceScalarExpression, StaticScalarExpression, StringScalarExpression, - TransformExpression, ValueAccessor, + NotLogicalExpression, OutputDataExpression, OutputExpression, QueryLocation, + ScalarExpression, SetTransformExpression, SourceScalarExpression, StaticScalarExpression, + StringScalarExpression, TransformExpression, ValueAccessor, }; use data_engine_parser_abstractions::{Parser, ParserOptions, ParserState}; use pest::Parser as _; @@ -223,6 +262,29 @@ mod tests { use crate::parser::pest::OplPestParser; use crate::parser::{OplParser, Rule}; + #[test] + fn test_route_to_operator_call() { + let query = "route_to \"test_out_port\""; + let mut state = ParserState::new(query); + let parse_result = OplPestParser::parse(Rule::operator_call, query).unwrap(); + assert_eq!(parse_result.len(), 1); + let rule = parse_result.into_iter().next().unwrap(); + parse_operator_call(rule, &mut state).unwrap(); + let result = state.build().unwrap(); + let expressions = result.get_expressions(); + assert_eq!(expressions.len(), 1); + + let expected = DataExpression::Output(OutputDataExpression::new( + QueryLocation::new_fake(), + OutputExpression::NamedSink(StringScalarExpression::new( + QueryLocation::new_fake(), + "test_out_port", + )), + )); + + assert_eq!(&expressions[0], &expected); + } + #[test] fn test_parse_set_operator_call() { let query = "set severity_text = \"ERROR\""; diff --git a/rust/otap-dataflow/crates/otap/Cargo.toml b/rust/otap-dataflow/crates/otap/Cargo.toml index e58fc42bc6..3a2951a6cd 100644 --- a/rust/otap-dataflow/crates/otap/Cargo.toml +++ b/rust/otap-dataflow/crates/otap/Cargo.toml @@ -48,6 +48,7 @@ otap-df-engine = { path = "../engine" } otap-df-engine-macros = { path = "../engine-macros" } otap-df-channel = { path = "../channel" } otap-df-config = { path = "../config" } +otap-df-opl = { path = "../opl" } otap-df-pdata = { path = "../pdata" } otap-df-query-engine = { path = "../query-engine" } otap-df-telemetry = { path = "../telemetry" } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor.rs b/rust/otap-dataflow/crates/otap/src/transform_processor.rs index 6deda2cf17..9affaf39a3 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor.rs @@ -31,18 +31,24 @@ use otap_df_engine::{ node::NodeId, processor::ProcessorWrapper, }; +use otap_df_opl::parser::OplParser; use otap_df_pdata::{OtapArrowRecords, OtapPayload}; -use otap_df_query_engine::pipeline::Pipeline; +use otap_df_query_engine::pipeline::{Pipeline, routing::RouterExtType, state::ExecutionState}; use otap_df_telemetry::metrics::MetricSet; use serde_json::Value; -use crate::{OTAP_PROCESSOR_FACTORIES, pdata::OtapPdata}; +use crate::{ + OTAP_PROCESSOR_FACTORIES, + pdata::{Context, OtapPdata}, + transform_processor::routing::RouterImpl, +}; -use self::config::Config; +use self::config::{Config, Query}; use self::metrics::Metrics; mod config; mod metrics; +mod routing; /// URN for the TransformProcessor pub const TRANSFORM_PROCESSOR_URN: &str = "urn:otel:transform:processor"; @@ -50,6 +56,7 @@ pub const TRANSFORM_PROCESSOR_URN: &str = "urn:otel:transform:processor"; /// Opentelemetry Processing Language Processor pub struct TransformProcessor { pipeline: Pipeline, + execution_state: ExecutionState, signal_scope: SignalScope, metrics: MetricSet, } @@ -100,20 +107,27 @@ impl TransformProcessor { // TODO we should pass some context to the parser so we can determine if there are valid // identifiers when checking the config: // https://github.com/open-telemetry/otel-arrow/issues/1530 - let pipeline_expr = KqlParser::parse(&config.query) - .map_err(|e| ConfigError::InvalidUserConfig { - error: format!("Could not parse TransformProcessor query: {e:?}"), - })? - .pipeline; + let pipeline_expr = match &config.query { + Query::KqlQuery(query) => KqlParser::parse(query), + Query::OplQuery(query) => OplParser::parse(query), + } + .map_err(|e| ConfigError::InvalidUserConfig { + error: format!("Could not parse TransformProcessor query: {e:?}"), + })? + .pipeline; // TODO: it would be nice if we could validate that the pipeline expr is supported by the // query engine here. Currently, validation happens lazily when the first batch is seen. // https://github.com/open-telemetry/otel-arrow/issues/1634 + let mut execution_state = ExecutionState::new(); + execution_state.set_extension::(Box::new(RouterImpl::new())); + Ok(Self { signal_scope: SignalScope::try_from(&pipeline_expr)?, pipeline: Pipeline::new(pipeline_expr), metrics: pipeline_ctx.register_metrics::(), + execution_state, }) } @@ -124,6 +138,46 @@ impl TransformProcessor { SignalScope::Signal(signal_type) => signal_type == pdata.signal_type(), } } + + async fn handle_routed_messages( + &mut self, + effect_handler: &mut EffectHandler, + ) -> Result<(), EngineError> { + let router_impl = self + .execution_state + .get_extension_mut::() + .and_then(|router| router.as_any_mut().downcast_mut::()) + .ok_or_else(|| EngineError::ProcessorError { + processor: effect_handler.processor_id(), + kind: ProcessorErrorKind::Other, + source_detail: "Router not found in pipeline exec state".into(), + error: "Routing error:".into(), + })?; + + for (route_name, otap_batch) in router_impl.routed.drain(..) { + let payload = OtapPayload::OtapArrowRecords(otap_batch); + // TODO -- need to properly handle Ack/Nack here by creating a new context, subscribing + // interests, and juggling the incoming/outgoing contexts & Ack/Nack messages correctly + let pdata = OtapPdata::new(Context::default(), payload); + + // Find the port name that matches the route name. + let port_name = effect_handler + .connected_ports() + .iter() + .find(|p| p.as_ref() == route_name.as_str()) + .ok_or_else(|| EngineError::ProcessorError { + processor: effect_handler.processor_id(), + kind: ProcessorErrorKind::Transport, + error: "Routing error: ".into(), + source_detail: format!("out_port name {} not configured", route_name), + })? + .clone(); + + effect_handler.send_message_to(port_name, pdata).await?; + } + + Ok(()) + } } /// Factory for creating [`TransformProcessor`] during plugin registration @@ -180,12 +234,21 @@ impl Processor for TransformProcessor { } else { let mut otap_batch: OtapArrowRecords = payload.try_into()?; otap_batch.decode_transport_optimized_ids()?; - match self.pipeline.execute(otap_batch).await { + match self + .pipeline + .execute_with_state(otap_batch, &mut self.execution_state) + .await + { Ok(otap_batch) => { self.metrics.msgs_transformed.inc(); + self.handle_routed_messages(effect_handler).await?; otap_batch.into() } Err(e) => { + // forward the routed messages in the event of a failure to avoid + // caching any batches that were routed before the pipeline fails + self.handle_routed_messages(effect_handler).await?; + self.metrics.msgs_transform_failed.inc(); return Err(EngineError::ProcessorError { processor: effect_handler.processor_id(), @@ -212,15 +275,19 @@ mod test { use super::*; use serde_json::json; - use otap_df_config::node::NodeUserConfig; + use otap_df_config::{PortName, node::NodeUserConfig}; use otap_df_engine::{ context::ControllerContext, + local::message::LocalSender, + message::Sender, + node::NodeWithPDataSender, testing::{ - processor::{TestContext, TestRuntime}, + processor::{TEST_OUT_PORT_NAME, TestContext, TestRuntime}, test_node, }, }; use otap_df_pdata::{ + otap::Logs, proto::{ OtlpProtoMessage, opentelemetry::{ @@ -237,14 +304,13 @@ mod test { use crate::pdata::OtapPdata; - fn try_create_with_query( - query: &str, + fn try_create_with_config( + config: Value, runtime: &TestRuntime, ) -> Result, ConfigError> { let mut node_config = NodeUserConfig::new_processor_config(TRANSFORM_PROCESSOR_URN); - node_config.config = json!({ - "query": query - }); + node_config.config = config; + node_config.default_out_port = Some(TEST_OUT_PORT_NAME.into()); let telemetry_registry_handle = runtime.metrics_registry(); let controller_context = ControllerContext::new(telemetry_registry_handle); @@ -259,10 +325,24 @@ mod test { ) } + fn try_create_with_kql_query( + query: &str, + runtime: &TestRuntime, + ) -> Result, ConfigError> { + try_create_with_config(json!({ "kql_query": query }), runtime) + } + + fn try_create_with_opl_query( + query: &str, + runtime: &TestRuntime, + ) -> Result, ConfigError> { + try_create_with_config(json!({ "opl_query": query }), runtime) + } + #[test] fn test_unparsable_query_is_config_time_error() { let runtime = TestRuntime::::new(); - match try_create_with_query("logs | invalid operator", &runtime) { + match try_create_with_kql_query("logs | invalid operator", &runtime) { Err(e) => { assert!( e.to_string() @@ -281,7 +361,7 @@ mod test { let telemetry_registry = runtime.metrics_registry(); let metrics_reporter = runtime.metrics_reporter(); let query = "logs | where severity_text == \"ERROR\""; - let processor = try_create_with_query(query, &runtime).expect("created processor"); + let processor = try_create_with_kql_query(query, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -416,7 +496,7 @@ mod test { // test ensure it will only operate on traces, but ignores other signals let runtime = TestRuntime::::new(); let query = "traces | where name == \"foo\""; - let processor = try_create_with_query(query, &runtime).expect("created processor"); + let processor = try_create_with_kql_query(query, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -451,7 +531,7 @@ mod test { // test ensure it will only operate on all signals let runtime = TestRuntime::::new(); let query = "signals | where name == \"foo\""; - let processor = try_create_with_query(query, &runtime).expect("created processor"); + let processor = try_create_with_kql_query(query, &runtime).expect("created processor"); runtime .set_processor(processor) .run_test(|mut ctx| async move { @@ -480,4 +560,183 @@ mod test { }) .validate(|_ctx| async move {}) } + + #[test] + fn test_simple_route_to() { + // test ensure it will only operate on all signals + let runtime = TestRuntime::::new(); + let query = "logs | route_to \"test_port\""; + let mut processor = try_create_with_opl_query(query, &runtime).expect("created processor"); + + let test_node_id = NodeId { + index: 1, + name: "test_node".into(), + }; + let (test_port_tx, test_port_rx) = otap_df_channel::mpsc::Channel::new(10); + processor + .set_pdata_sender( + test_node_id, + PortName::from("test_port"), + Sender::Local(LocalSender::mpsc(test_port_tx)), + ) + .unwrap(); + + runtime + .set_processor(processor) + .run_test(|mut ctx| async move { + let input = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { + resource_logs: vec![ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + vec![LogRecord::build().severity_text("ERROR").finish()], + )], + )], + })); + let pdata = OtapPdata::new_default(input.clone().into()); + ctx.process(Message::PData(pdata)) + .await + .expect("no process error"); + + let out = ctx + .drain_pdata() + .await + .into_iter() + .map(OtapPdata::payload) + .map(OtapArrowRecords::try_from) + .map(Result::unwrap); + let result = out.into_iter().next().expect("one result"); + + // expect we got an empty batch: + assert_eq!(result, OtapArrowRecords::Logs(Logs::default())); + // TODO when we support Ack/Nack here assert on the context of this message + + let mut routed = Vec::new(); + while let Ok(msg) = test_port_rx.try_recv() { + routed.push(msg); + } + assert_eq!(routed.len(), 1); + let (_context, payload) = routed.pop().unwrap().into_parts(); + match payload { + OtapPayload::OtapArrowRecords(result) => { + assert_eq!(result, input) + } + _ => panic!("unexpected payload type"), + } + // TODO when we support Ack/Nack here assert on routed context + }) + .validate(|_ctx| async move {}) + } + + #[test] + fn test_conditional_route_to() { + // test ensure it will only operate on all signals + let runtime = TestRuntime::::new(); + let query = r#"logs + | if (severity_text == "ERROR") { + route_to "error_port" + } else if (severity_text == "INFO") { + route_to "info_port" + }"#; + let mut processor = try_create_with_opl_query(query, &runtime).expect("created processor"); + + let test_node_id = NodeId { + index: 1, + name: "test_node".into(), + }; + let (error_port_tx, error_port_rx) = otap_df_channel::mpsc::Channel::new(10); + processor + .set_pdata_sender( + test_node_id.clone(), + PortName::from("error_port"), + Sender::Local(LocalSender::mpsc(error_port_tx)), + ) + .unwrap(); + let (info_port_tx, info_port_rx) = otap_df_channel::mpsc::Channel::new(10); + processor + .set_pdata_sender( + test_node_id.clone(), + PortName::from("info_port"), + Sender::Local(LocalSender::mpsc(info_port_tx)), + ) + .unwrap(); + + fn assert_logs_records_equal(otap_batch: OtapArrowRecords, log_record: LogRecord) { + let result = otap_to_otlp(&otap_batch); + match result { + OtlpProtoMessage::Logs(logs) => { + assert_eq!( + &logs.resource_logs[0].scope_logs[0].log_records, + &[log_record] + ) + } + _ => panic!("unexpected result"), + } + } + + runtime + .set_processor(processor) + .run_test(|mut ctx| async move { + let error_log_record = LogRecord::build().severity_text("ERROR").finish(); + let info_log_record = LogRecord::build().severity_text("INFO").finish(); + let other_log_record = LogRecord::build().severity_text("DEBUG").finish(); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(LogsData { + resource_logs: vec![ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + vec![ + error_log_record.clone(), + info_log_record.clone(), + other_log_record.clone(), + ], + )], + )], + })); + let pdata = OtapPdata::new_default(input.clone().into()); + ctx.process(Message::PData(pdata)) + .await + .expect("no process error"); + + // check anything not routed get outputted to the default port + let out = ctx + .drain_pdata() + .await + .into_iter() + .map(OtapPdata::payload) + .map(OtapArrowRecords::try_from) + .map(Result::unwrap); + let default_result = out.into_iter().next().expect("one result"); + assert_logs_records_equal(default_result, other_log_record); + + // check error log record got routed to correct out pot + let mut routed = Vec::new(); + while let Ok(msg) = error_port_rx.try_recv() { + routed.push(msg); + } + assert_eq!(routed.len(), 1); + let (_context, payload) = routed.pop().unwrap().into_parts(); + match payload { + OtapPayload::OtapArrowRecords(result) => { + assert_logs_records_equal(result, error_log_record); + } + _ => panic!("unexpected payload type"), + } + + // check error log record got routed to correct out pot + let mut routed = Vec::new(); + while let Ok(msg) = info_port_rx.try_recv() { + routed.push(msg); + } + assert_eq!(routed.len(), 1); + let (_context, payload) = routed.pop().unwrap().into_parts(); + match payload { + OtapPayload::OtapArrowRecords(result) => { + assert_logs_records_equal(result, info_log_record); + } + _ => panic!("unexpected payload type"), + } + }) + .validate(|_ctx| async move {}) + } } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs index 45147198e3..3742439518 100644 --- a/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/config.rs @@ -6,7 +6,14 @@ use serde::Deserialize; /// Configuration for the [`TransformProcessor`](super::TransformProcessor) #[derive(Debug, Deserialize)] pub struct Config { - /// the query that defines the transformation to be applied - pub query: String, + #[serde(flatten)] + pub query: Query, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Query { + KqlQuery(String), + OplQuery(String), // TODO - add section to allow transforms to be specified in OTTL } diff --git a/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs new file mode 100644 index 0000000000..df5fdfe959 --- /dev/null +++ b/rust/otap-dataflow/crates/otap/src/transform_processor/routing.rs @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; + +use async_trait::async_trait; +use otap_df_pdata::OtapArrowRecords; +use otap_df_query_engine::{ + error::Result, + pipeline::routing::{RouteName, Router}, +}; + +/// implementation of [`Router`] used by [`TransformProcessor`] +pub(super) struct RouterImpl { + pub routed: Vec<(RouteName, OtapArrowRecords)>, +} + +impl RouterImpl { + pub fn new() -> Self { + Self { routed: Vec::new() } + } +} + +#[async_trait(?Send)] +impl Router for RouterImpl { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + async fn send(&mut self, route_name: RouteName, otap_batch: OtapArrowRecords) -> Result<()> { + self.routed.push((route_name, otap_batch)); + + Ok(()) + } +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs index 8b71fc2b96..17b92f4d1d 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline.rs @@ -4,8 +4,6 @@ //! This module defines the top-level API for executing data transformation pipelines on //! streaming telemetry data in the OTAP columnar format. -use std::sync::Arc; - use arrow::compute::concat_batches; use async_trait::async_trait; use data_engine_expressions::PipelineExpression; @@ -18,9 +16,11 @@ use datafusion::physical_plan::streaming::PartitionStream; use datafusion::physical_plan::{ExecutionPlan, execute_stream}; use otap_df_pdata::OtapArrowRecords; use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; +use std::sync::Arc; use crate::error::{Error, Result}; use crate::pipeline::planner::PipelinePlanner; +use crate::pipeline::state::ExecutionState; use crate::table::RecordBatchPartitionStream; mod attributes; @@ -29,6 +29,9 @@ mod filter; mod functions; mod planner; +pub mod routing; +pub mod state; + /// A stage in the pipeline. /// /// Used for the physical execution of one or more pipeline expressions. Stages are compiled @@ -55,6 +58,7 @@ pub trait PipelineStage { session_context: &SessionContext, config_options: &ConfigOptions, task_context: Arc, + exec_options: &mut ExecutionState, ) -> Result; } @@ -83,6 +87,7 @@ impl PipelineStage for DataFusionPipelineStage { _session_context: &SessionContext, _config_options: &ConfigOptions, task_context: Arc, + _execution_options: &mut ExecutionState, ) -> Result { let rb = match otap_batch.get(self.payload_type) { Some(rb) => rb, @@ -199,6 +204,18 @@ impl Pipeline { /// Execute the pipeline on a batch of telemetry data. /// + /// # Arguments + /// - `otap_batch`: The input telemetry data to process + /// + /// # Returns + /// The transformed telemetry data after all stages have executed + pub async fn execute(&mut self, otap_batch: OtapArrowRecords) -> Result { + let mut exec_state = ExecutionState::default(); + self.execute_with_state(otap_batch, &mut exec_state).await + } + + /// Execute the pipeline on a batch of telemetry data, using the provided execution state. + /// /// Any query planning happens during the first call to execute, including setting up any /// DataFusion SessionContext, TaskContext, etc. Subsequent calls will not have to redo /// the full planning, although individual stages may do light re-plannings to adapt to @@ -206,10 +223,15 @@ impl Pipeline { /// /// # Arguments /// - `otap_batch`: The input telemetry data to process + /// - `exec_state`: The execution state to use for the pipeline execution /// /// # Returns /// The transformed telemetry data after all stages have executed - pub async fn execute(&mut self, mut otap_batch: OtapArrowRecords) -> Result { + pub async fn execute_with_state( + &mut self, + mut otap_batch: OtapArrowRecords, + exec_state: &mut ExecutionState, + ) -> Result { // lazily plan the pipeline if have not already done so if self.planned_pipeline.is_none() { let session_ctx = Self::create_session_context(); @@ -231,6 +253,7 @@ impl Pipeline { &pipeline.session_context, pipeline.config_options.as_ref(), pipeline.task_context.clone(), + exec_state, ) .await?; } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs index 860ee1d954..8207d5641b 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/attributes.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use crate::error::{Error, Result}; use crate::pipeline::PipelineStage; use crate::pipeline::planner::AttributesIdentifier; +use crate::pipeline::state::ExecutionState; /// This pipeline stage can be used to rename and delete attributes according to the transformation /// specified by the [`AttributesTransform`] @@ -40,6 +41,7 @@ impl PipelineStage for AttributeTransformPipelineStage { _session_context: &SessionContext, _config_options: &ConfigOptions, _task_context: Arc, + _exec_state: &mut ExecutionState, ) -> Result { let attrs_payload_type = match self.attrs_id { AttributesIdentifier::Root => match otap_batch { diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs index 3caee196d7..a32a55a796 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/conditional.rs @@ -18,6 +18,7 @@ use otap_df_pdata::otap::{Logs, Metrics, Traces}; use crate::error::Result; use crate::pipeline::filter::{Composite, FilterExec, filter_otap_batch}; +use crate::pipeline::state::ExecutionState; use crate::pipeline::{BoxedPipelineStage, PipelineStage}; /// This [`PipelineStage`] implementation will conditionally apply child pipeline stages on rows @@ -88,6 +89,7 @@ impl PipelineStage for ConditionalPipelineStage { session_ctx: &SessionContext, config_options: &ConfigOptions, task_context: Arc, + exec_state: &mut ExecutionState, ) -> Result { let root_batch = match otap_batch.root_record_batch() { Some(root_batch) => root_batch, @@ -146,6 +148,7 @@ impl PipelineStage for ConditionalPipelineStage { session_ctx, config_options, task_context.clone(), + exec_state, ) .await?; } @@ -167,6 +170,7 @@ impl PipelineStage for ConditionalPipelineStage { session_ctx, config_options, task_context.clone(), + exec_state, ) .await?; } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs index d954ad4b35..54387536ce 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs @@ -44,6 +44,7 @@ use crate::pipeline::planner::{ AttributesIdentifier, BinaryArg, ColumnAccessor, try_attrs_value_filter_from_literal, try_static_scalar_to_attr_literal, try_static_scalar_to_literal_for_column, }; +use crate::pipeline::state::ExecutionState; pub mod optimize; @@ -1711,6 +1712,7 @@ impl PipelineStage for FilterPipelineStage { session_context: &SessionContext, _config_options: &ConfigOptions, _task_context: Arc, + _exec_state: &mut ExecutionState, ) -> Result { if otap_batch.root_record_batch().is_none() { // if batch is empty, no filtering to do diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs index 75a7413af0..d789efb3f6 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs @@ -6,8 +6,9 @@ use data_engine_expressions::{ BooleanValue, DataExpression, DateTimeValue, DoubleValue, Expression, IntegerValue, LogicalExpression, MapSelector, MoveTransformExpression, MutableValueExpression, - PipelineExpression, ReduceMapTransformExpression, RenameMapKeysTransformExpression, - ScalarExpression, StaticScalarExpression, StringValue, TransformExpression, ValueAccessor, + OutputExpression, PipelineExpression, ReduceMapTransformExpression, + RenameMapKeysTransformExpression, ScalarExpression, StaticScalarExpression, StringValue, + TransformExpression, ValueAccessor, }; use datafusion::logical_expr::{BinaryExpr, Expr, Operator, col, lit}; use datafusion::prelude::{SessionContext, lit_timestamp_nano}; @@ -22,6 +23,7 @@ use crate::pipeline::attributes::AttributeTransformPipelineStage; use crate::pipeline::conditional::{ConditionalPipelineStage, ConditionalPipelineStageBranch}; use crate::pipeline::filter::optimize::AttrsFilterCombineOptimizerRule; use crate::pipeline::filter::{Composite, FilterExec, FilterPipelineStage, FilterPlan}; +use crate::pipeline::routing::RouteToPipelineStage; use crate::pipeline::{BoxedPipelineStage, PipelineStage}; /// Converts an pipeline expression (AST) into a series of executable pipeline stages. @@ -136,6 +138,12 @@ impl PipelinePlanner { Ok(vec![Box::new(pipeline_stage)]) } + DataExpression::Output(output_expr) => match output_expr.get_output() { + OutputExpression::NamedSink(name) => { + Ok(vec![Box::new(RouteToPipelineStage::new(name.get_value()))]) + } + }, + // TODO support other DataExpressions other => Err(Error::NotYetSupportedError { message: format!("data expression not yet supported {}", other.get_name()), diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs new file mode 100644 index 0000000000..35d235afe9 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/routing.rs @@ -0,0 +1,210 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Routing for columnar query engine. +//! +//! This module contains the pipeline stage that emits OTAP batches to some destination. +//! The routing implementation is customizable, using the `Router` trait which the +//! pipeline stage implementation will use to send the data to the appropriate route. + +use std::any::Any; +use std::rc::Rc; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::config::ConfigOptions; +use datafusion::execution::TaskContext; +use datafusion::execution::context::SessionContext; +use otap_df_pdata::OtapArrowRecords; +use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType; + +use crate::error::{Error, Result}; +use crate::pipeline::PipelineStage; +use crate::pipeline::state::ExecutionState; + +/// A trait for routing OTAP (OpenTelemetry Arrow Protocol) batch records to some destination. +/// +/// The pipeline stage that handles routing will look for an implementation of this trait +/// in the `ExecutionState` extensions, and invoke it to send the data to the appropriate route. +/// +/// The trait also provides methods to allow down-casting to concrete implementations. This means +/// the execution state can own the router as a trait object, but the pipeline caller can retrieve +/// it, downcast it to a concrete type, and inspect any state it may have or call other methods. +/// This is useful for implementations that buffer batches before sending them. +/// +#[async_trait(?Send)] +pub trait Router { + /// returns a reference as `Any` for down-casting + fn as_any(&self) -> &dyn Any; + + /// returns a mutable reference as `Any` for down-casting + fn as_any_mut(&mut self) -> &mut dyn Any; + + /// Send OTAP batch to the specified route. + async fn send(&mut self, route_name: RouteName, otap_batch: OtapArrowRecords) -> Result<()>; +} + +/// Route name type +// `Rc` is used here for `Router` implementations that buffer batches to route, so they could own the +// route name without having to clone a string +pub type RouteName = Rc; + +/// Extension type for `Router` implementations used by this pipeline stage. +/// +/// For callers that invoke pipeline supporting routing outputs, this type is used to provide a reference +/// to the `Router` implementation in the`ExecutionState` extensions. +pub type RouterExtType = Box; + +/// [`PipelineStage`] that routes OTAP batches to a specified route. +/// +/// This stage looks for a `Router` implementation in the `ExecutionState` extensions, and invokes +/// it to send the batch to the specified route. Executors of [`Pipeline`s](super::Pipeline) that +/// include this stage must ensure that a `Router` is set in the execution state. +pub struct RouteToPipelineStage { + route_name: RouteName, +} + +impl RouteToPipelineStage { + /// Create a new `RouteToPipelineStage` that routes to the specified route name. + #[must_use] + pub fn new(route_name: &str) -> Self { + Self { + route_name: route_name.to_string().into(), + } + } +} + +#[async_trait(?Send)] +impl PipelineStage for RouteToPipelineStage { + async fn execute( + &mut self, + otap_batch: OtapArrowRecords, + _session_context: &SessionContext, + _config_options: &ConfigOptions, + _task_context: Arc, + exec_state: &mut ExecutionState, + ) -> Result { + let root_payload_type = otap_batch.root_payload_type(); + match exec_state.get_extension_mut::() { + Some(router) => { + router.send(self.route_name.clone(), otap_batch).await?; + } + None => { + return Err(Error::ExecutionError { + cause: "No router extension found in execution state".to_string(), + }); + } + } + + // emit empty batch + Ok(match root_payload_type { + ArrowPayloadType::Spans => OtapArrowRecords::Traces(Default::default()), + ArrowPayloadType::Logs => OtapArrowRecords::Logs(Default::default()), + _ => OtapArrowRecords::Metrics(Default::default()), + }) + } +} + +#[cfg(test)] +mod test { + use data_engine_expressions::{ + DataExpression, OutputDataExpression, OutputExpression, PipelineExpressionBuilder, + QueryLocation, StringScalarExpression, + }; + use otap_df_pdata::otap::Logs; + + use crate::pipeline::Pipeline; + + use super::*; + + struct TestRouter { + routed: Vec<(RouteName, OtapArrowRecords)>, + } + + #[async_trait(?Send)] + impl Router for TestRouter { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + async fn send( + &mut self, + route_name: RouteName, + otap_batch: OtapArrowRecords, + ) -> Result<()> { + self.routed.push((route_name, otap_batch)); + Ok(()) + } + } + + #[tokio::test] + async fn test_route_to_pipeline_stage() { + let output_expr = OutputDataExpression::new( + QueryLocation::new_fake(), + OutputExpression::NamedSink(StringScalarExpression::new( + QueryLocation::new_fake(), + "test_sink", + )), + ); + let pipeline_expr = PipelineExpressionBuilder::new("test") + .with_expressions(vec![DataExpression::Output(output_expr)]) + .build() + .unwrap(); + let mut pipeline = Pipeline::new(pipeline_expr); + + let mut exec_state = ExecutionState::new(); + let test_router = TestRouter { routed: vec![] }; + exec_state.set_extension::>(Box::new(test_router)); + + // TODO maybe not test with empty so we can make some verifications of the data + let otap_batch = OtapArrowRecords::Logs(Logs::default()); + let result = pipeline + .execute_with_state(otap_batch, &mut exec_state) + .await + .unwrap(); + let empty_batch = OtapArrowRecords::Logs(Logs::default()); + assert_eq!(result, empty_batch); + + let router = exec_state.get_extension_mut::>().unwrap(); + + match router.as_any_mut().downcast_mut::() { + Some(test_router) => { + assert_eq!(test_router.routed.len(), 1); + assert_eq!(test_router.routed[0].0.as_str(), "test_sink"); + } + None => panic!("Failed to downcast router to TestRouter"), + } + } + + #[tokio::test] + async fn test_route_to_pipeline_stage_no_router() { + let output_expr = OutputDataExpression::new( + QueryLocation::new_fake(), + OutputExpression::NamedSink(StringScalarExpression::new( + QueryLocation::new_fake(), + "test_sink", + )), + ); + let pipeline_expr = PipelineExpressionBuilder::new("test") + .with_expressions(vec![DataExpression::Output(output_expr)]) + .build() + .unwrap(); + let mut pipeline = Pipeline::new(pipeline_expr); + let mut exec_state = ExecutionState::new(); + let otap_batch = OtapArrowRecords::Logs(Logs::default()); + let result = pipeline + .execute_with_state(otap_batch, &mut exec_state) + .await; + + match result { + Err(Error::ExecutionError { cause }) => { + assert_eq!(cause, "No router extension found in execution state"); + } + _ => panic!("Expected ExecutionError"), + } + } +} diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs new file mode 100644 index 0000000000..40011d9bb1 --- /dev/null +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/state.rs @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//! Execution state for columnar query engine. + +use std::any::{Any, TypeId}; +use std::collections::HashMap; +use std::hash::{BuildHasherDefault, Hasher}; + +/// Additional state that may be carried along during the execution of a pipeline. +/// +/// This can be used to store arbitrary extensions that may be needed by custom pipeline stages. +/// Extensions are stored in a type-map and pipeline stages can retrieve them by a known type. +/// +/// This is similar to datafusion's `ExecutionState`, which it also uses for extensions, but +/// without the need for `Send + Sync` bounds, as those are not required in this context due to these +/// pipeline stages executing in a single threaded runtime. This also means that pipeline +// stages can get mutable references to extensions if needed. +// +// In the future, this may be expanded to include other execution-related state like metrics,or +// other state needed for stateful stream processing. +// +// When the pipeline is executed without an ExecutionState being provided, a default one will be +// created. This means that anything added to this should be inexpensive to initialize in the +// default case. +#[derive(Default)] +pub struct ExecutionState { + extensions: Option, +} + +impl ExecutionState { + /// Create new execution options. + #[must_use] + pub fn new() -> Self { + Self { extensions: None } + } + + /// Get extension of type T, if it exists. + #[must_use] + pub fn get_extension(&self) -> Option<&T> { + self.extensions.as_ref().and_then(|map| { + map.get(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_ref::()) + }) + } + + /// Get mutable extension of type T, if it exists. + #[must_use] + pub fn get_extension_mut(&mut self) -> Option<&mut T> { + self.extensions.as_mut().and_then(|map| { + map.get_mut(&TypeId::of::()) + .and_then(|boxed| boxed.downcast_mut::()) + }) + } + + /// Set extension of type T. + pub fn set_extension(&mut self, value: T) { + let map = self.extensions.get_or_insert_with(ExtensionMap::default); + _ = map.insert(TypeId::of::(), Box::new(value)); + } +} + +/// Map that holds opaque objects indexed by their type. +/// +// Note: this is similar to datafusion's `AnyMap`, which it also uses for extensions, but +// without the `Send + Sync` bounds, as those are not required in this context due to these +// pipeline stages executing in a single threaded runtime. This also means that pipeline +// stages can get mutable references to extensions if needed. +type ExtensionMap = HashMap, BuildHasherDefault>; + +/// Hasher for [`ExtensionMap`]. +/// +// This is the same as the one used by datafusion's `AnyMap`. +// +// With [`TypeId`]s as keys, there's no need to hash them. They are already hashes themselves, +// coming from the compiler. The [`IdHasher`] just holds the [`u64`] of the [`TypeId`], and then +// returns it, instead of doing any bit fiddling. +#[derive(Default)] +struct IdHasher(u64); + +impl Hasher for IdHasher { + fn write(&mut self, _: &[u8]) { + unreachable!("TypeId calls write_u64"); + } + + #[inline] + fn write_u64(&mut self, id: u64) { + self.0 = id; + } + + #[inline] + fn finish(&self) -> u64 { + self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq)] + struct TestExtension { + value: i32, + } + + #[derive(Debug, PartialEq)] + struct AnotherExtension { + name: String, + } + + #[test] + fn test_set_and_get_extension() { + let mut state = ExecutionState::new(); + + // Initially, extension should not exist + assert!(state.get_extension::().is_none()); + + // Set an extension + state.set_extension(TestExtension { value: 42 }); + + // Now we should be able to get it + let ext = state.get_extension::(); + assert!(ext.is_some()); + assert_eq!(ext.unwrap().value, 42); + } + + #[test] + fn test_get_extension_mut() { + let mut state = ExecutionState::new(); + + // Set an extension + state.set_extension(TestExtension { value: 10 }); + + // Get mutable reference and modify it + { + let ext = state.get_extension_mut::(); + assert!(ext.is_some()); + ext.unwrap().value = 20; + } + + // Verify the modification + let ext = state.get_extension::(); + assert_eq!(ext.unwrap().value, 20); + } + + #[test] + fn test_multiple_extensions() { + let mut state = ExecutionState::new(); + + // Set multiple different extensions + state.set_extension(TestExtension { value: 100 }); + state.set_extension(AnotherExtension { + name: "test".to_string(), + }); + + // Both should be retrievable independently + let test_ext = state.get_extension::(); + assert!(test_ext.is_some()); + assert_eq!(test_ext.unwrap().value, 100); + + let another_ext = state.get_extension::(); + assert!(another_ext.is_some()); + assert_eq!(another_ext.unwrap().name, "test"); + } + + #[test] + fn test_overwrite_extension() { + let mut state = ExecutionState::new(); + + // Set an extension + state.set_extension(TestExtension { value: 1 }); + + // Overwrite it + state.set_extension(TestExtension { value: 2 }); + + // Should have the new value + let ext = state.get_extension::(); + assert_eq!(ext.unwrap().value, 2); + } +}