Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,14 @@ fn process_record<'a, TRecord: Record + 'static>(
);
break;
}
DataExpression::Output(c) => {
execution_context.add_diagnostic_if_enabled(
RecordSetEngineDiagnosticLevel::Error,
c,
|| "Output Expression not yet supported in record set engine".into(),
);
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum DataExpression {

/// Conditional data expression.
Conditional(ConditionalDataExpression),

/// Output data expression
Output(OutputDataExpression),
}

impl DataExpression {
Expand All @@ -28,6 +31,7 @@ impl DataExpression {
DataExpression::Summary(s) => s.try_fold(scope),
DataExpression::Transform(t) => t.try_fold(scope),
DataExpression::Conditional(c) => c.try_fold(scope),
DataExpression::Output(o) => o.try_fold(scope),
}
}
}
Expand All @@ -39,6 +43,7 @@ impl Expression for DataExpression {
DataExpression::Summary(s) => s.get_query_location(),
DataExpression::Transform(t) => t.get_query_location(),
DataExpression::Conditional(c) => c.get_query_location(),
DataExpression::Output(o) => o.get_query_location(),
}
}

Expand All @@ -48,6 +53,7 @@ impl Expression for DataExpression {
DataExpression::Summary(_) => "DataExpression(Summary)",
DataExpression::Transform(_) => "DataExpression(Transform)",
DataExpression::Conditional(_) => "DataExpression(Conditional)",
DataExpression::Output(_) => "DataExpression(Output)",
}
}

Expand All @@ -57,6 +63,7 @@ impl Expression for DataExpression {
DataExpression::Summary(s) => s.fmt_with_indent(f, indent),
DataExpression::Transform(t) => t.fmt_with_indent(f, indent),
DataExpression::Conditional(c) => c.fmt_with_indent(f, indent),
DataExpression::Output(o) => o.fmt_with_indent(f, indent),
}
}
}
Expand Down Expand Up @@ -295,3 +302,89 @@ impl ConditionalDataExpressionBranch {
&self.expressions
}
}

/// Data expression representing an operation that emits data to a sink.
#[derive(Debug, Clone, PartialEq)]
pub struct OutputDataExpression {
query_location: QueryLocation,
output: OutputExpression,
}

impl OutputDataExpression {
pub fn new(query_location: QueryLocation, output: OutputExpression) -> Self {
Self {
query_location,
output,
}
}

pub fn get_output(&self) -> &OutputExpression {
&self.output
}

pub fn try_fold(&mut self, _scope: &PipelineResolutionScope) -> Result<(), ExpressionError> {
// No folding currently supported for output expressions.
Ok(())
}
}

impl Expression for OutputDataExpression {
fn get_query_location(&self) -> &QueryLocation {
&self.query_location
}

fn get_name(&self) -> &'static str {
"OutputDataExpression"
}

fn fmt_with_indent(&self, f: &mut std::fmt::Formatter<'_>, indent: &str) -> std::fmt::Result {
writeln!(f, "Output:")?;
write!(f, "{indent}└── ")?;
match &self.output {
OutputExpression::NamedSink(expr) => {
expr.fmt_with_indent(f, format!("{indent} ").as_str())
}
}
}
}

/// Expression representing an operation that emits data to a sink.
#[derive(Debug, Clone, PartialEq)]
pub enum OutputExpression {
/// Output data to a sink identified by name.
/// Currently this contains a static string because it's the only way we handle identifying
/// where to output the data. In the future we could support dynamic sink identified by a
/// variable, result of a function call, or other some expression, at which point we can change
/// this to contain the more general `StaticExpression`.
NamedSink(StringScalarExpression),
Comment on lines +354 to +359
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@albertlockett I just noticed this. Sorry for the late feedback but here it is...

  1. "Sink" What is a sink in the context of AST? Maybe "NamedDestination" would be better. Remember it is an explicit goal this AST\code NOT be coupled to the collector.

  2. Current design seems very limited. As far as routing is concerned, I can see different needs. Fan out\send\split, stuff like that.

    Consider something like this:

    enum RouteDataExpression {
      MapTo(RouteDestinationExpression), // Send the data and quit
      SplitTo(RouteDestinationExpression) // Send a copy of the data and continue on
    }
    
    enum RouteDestinationExpression {
       Named(StringScalarExpression)
    }

Copy link
Copy Markdown
Member Author

@albertlockett albertlockett Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re point 1: imo "sink" is kind of a synonym for to "destination". I don't see how one is any more coupled to the collector than the other, but I'm fine to change it to destination if you think it's a better name. I'm not personally hung up on the naming one way or the other.

re point 2: the intention of the output expression was simply that the data would be emitted to some destination, and that the variants of the enum would control how the destination is defined. We could change the design to have the expression variants control how data is routed in the pipeline, but that could also be achieved through other types of data expressions. For example, we could imagine a fork with multiple branches as child expressions, which could achieve the "copy the data and continue on" scenario in your example.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like we need a design discussion 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that'd be great! We can discuss during wednesday's SIG meeting

}

#[cfg(test)]
mod test {
use super::*;
use std::fmt;

// Helper struct to test fmt_with_indent by implementing Display
struct DisplayWrapper<'a, T: Expression>(&'a T, &'a str);

impl<'a, T: Expression> fmt::Display for DisplayWrapper<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt_with_indent(f, self.1)
}
}

#[test]
fn test_output_expression_fmt_with_indent() {
let string_expr = StringScalarExpression::new(QueryLocation::new_fake(), "sink_name");
let output_expr = OutputExpression::NamedSink(string_expr.clone());
let output_data_expr = OutputDataExpression::new(QueryLocation::new_fake(), output_expr);
let output = format!("{}", DisplayWrapper(&output_data_expr, ""));
assert_eq!(
output,
format!(
"Output:\n\
└── {string_expr:?}\n"
)
);
}
}
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/engine/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl From<Context8u8> for f64 {
}
}

/// Standard context values hold three caller-specified fields. The
/// Standard context values hold three caller-specified fields. The
/// size is arbitrary, but shouldn't be larger than needed by
/// callers. For example: retry count, sequence and generation
/// numbers, deadline, num_items, etc.
Expand Down
6 changes: 5 additions & 1 deletion rust/otap-dataflow/crates/engine/src/testing/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ impl ValidateContext {
}
}

/// The name of the out_port that will be configured automatically on the [`ProcessorWrapper`] by
/// the [`TestRuntime`].
pub const TEST_OUT_PORT_NAME: &str = "out";

/// A test runtime for simplifying processor tests.
///
/// This structure encapsulates the common setup logic needed for testing processors,
Expand Down Expand Up @@ -224,7 +228,7 @@ impl<PData: Clone + Debug + 'static> TestRuntime<PData> {
// Set the output sender for the processor
let _ = processor.set_pdata_sender(
test_node(self.config().name.clone()),
"out".into(),
TEST_OUT_PORT_NAME.into(),
pdata_sender,
);

Expand Down
5 changes: 5 additions & 0 deletions rust/otap-dataflow/crates/opl/src/opl.pest
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,14 @@ where_operator_call = {
"where" ~ expression
}

route_to_operator_call = {
"route_to" ~ string_literal
}

operator_call = {
set_operator_call
| if_else_operator_call
| route_to_operator_call
| where_operator_call
}

Expand Down
74 changes: 68 additions & 6 deletions rust/otap-dataflow/crates/opl/src/parser/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

use data_engine_expressions::{
ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression,
DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression, QueryLocation,
DiscardDataExpression, Expression, LogicalExpression, NotLogicalExpression,
OutputDataExpression, OutputExpression, QueryLocation, StaticScalarExpression,
TransformExpression,
};
use data_engine_parser_abstractions::{ParserError, to_query_location};
use data_engine_parser_abstractions::{
ParserError, parse_standard_string_literal, to_query_location,
};
use pest::iterators::Pair;

use crate::parser::assignment::parse_assignment_expression;
Expand All @@ -20,8 +23,9 @@ pub(crate) fn parse_operator_call(
) -> Result<(), ParserError> {
for rule in rule.into_inner() {
match rule.as_rule() {
Rule::set_operator_call => parse_set_operator_call(rule, pipeline_builder)?,
Rule::if_else_operator_call => parse_if_else_opeartor_call(rule, pipeline_builder)?,
Rule::route_to_operator_call => parse_route_to_operator_call(rule, pipeline_builder)?,
Rule::set_operator_call => parse_set_operator_call(rule, pipeline_builder)?,
Rule::where_operator_call => parse_where_operator_call(rule, pipeline_builder)?,
invalid_rule => {
let query_location = to_query_location(&rule);
Expand All @@ -37,6 +41,41 @@ pub(crate) fn parse_operator_call(
Ok(())
}

pub(crate) fn parse_route_to_operator_call(
operator_call_rule: Pair<'_, Rule>,
pipeline_builder: &mut dyn PipelineBuilder,
) -> Result<(), ParserError> {
let query_location = to_query_location(&operator_call_rule);
if let Some(rule) = operator_call_rule.into_inner().next() {
let rule_query_location = to_query_location(&rule);
let dest = match rule.as_rule() {
Rule::string_literal => match parse_standard_string_literal(rule) {
StaticScalarExpression::String(string) => string,
invalid_expr => {
return Err(ParserError::SyntaxError(
rule_query_location,
format!("Expected static string literal, found {:?}", invalid_expr),
));
}
},
invalid_rule => {
let query_location = to_query_location(&rule);
return Err(invalid_child_rule_error(
query_location,
Rule::string_literal,
invalid_rule,
));
}
};

let output_expr =
OutputDataExpression::new(query_location, OutputExpression::NamedSink(dest));
pipeline_builder.push_data_expression(DataExpression::Output(output_expr));
}

Ok(())
}

pub(crate) fn parse_set_operator_call(
operator_call_rule: Pair<'_, Rule>,
pipeline_builder: &mut dyn PipelineBuilder,
Expand Down Expand Up @@ -211,9 +250,9 @@ mod tests {
use data_engine_expressions::{
ConditionalDataExpression, ConditionalDataExpressionBranch, DataExpression,
DiscardDataExpression, EqualToLogicalExpression, LogicalExpression, MutableValueExpression,
NotLogicalExpression, QueryLocation, ScalarExpression, SetTransformExpression,
SourceScalarExpression, StaticScalarExpression, StringScalarExpression,
TransformExpression, ValueAccessor,
NotLogicalExpression, OutputDataExpression, OutputExpression, QueryLocation,
ScalarExpression, SetTransformExpression, SourceScalarExpression, StaticScalarExpression,
StringScalarExpression, TransformExpression, ValueAccessor,
};
use data_engine_parser_abstractions::{Parser, ParserOptions, ParserState};
use pest::Parser as _;
Expand All @@ -223,6 +262,29 @@ mod tests {
use crate::parser::pest::OplPestParser;
use crate::parser::{OplParser, Rule};

#[test]
fn test_route_to_operator_call() {
let query = "route_to \"test_out_port\"";
let mut state = ParserState::new(query);
let parse_result = OplPestParser::parse(Rule::operator_call, query).unwrap();
assert_eq!(parse_result.len(), 1);
let rule = parse_result.into_iter().next().unwrap();
parse_operator_call(rule, &mut state).unwrap();
let result = state.build().unwrap();
let expressions = result.get_expressions();
assert_eq!(expressions.len(), 1);

let expected = DataExpression::Output(OutputDataExpression::new(
QueryLocation::new_fake(),
OutputExpression::NamedSink(StringScalarExpression::new(
QueryLocation::new_fake(),
"test_out_port",
)),
));

assert_eq!(&expressions[0], &expected);
}

#[test]
fn test_parse_set_operator_call() {
let query = "set severity_text = \"ERROR\"";
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/otap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ otap-df-engine = { path = "../engine" }
otap-df-engine-macros = { path = "../engine-macros" }
otap-df-channel = { path = "../channel" }
otap-df-config = { path = "../config" }
otap-df-opl = { path = "../opl" }
otap-df-pdata = { path = "../pdata" }
otap-df-query-engine = { path = "../query-engine" }
otap-df-telemetry = { path = "../telemetry" }
Expand Down
Loading
Loading