Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tremor-pipeline/src/op/trickle/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl TrickleOperator {
ast::Stmt::OperatorDecl(ref op) => {
let op = op.clone().into_static();
let config = mk_node_config(
op.id,
op.node_id.id().to_string(),
format!("{}::{}", op.kind.module, op.kind.operation),
op.params,
);
Expand Down
38 changes: 16 additions & 22 deletions tremor-pipeline/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,24 +426,21 @@ impl Query {
subqueries.insert(s.id.clone(), s.clone());
}
Stmt::Operator(o) => {
if nodes.contains_key(&common_cow(&o.id)) {
let error_func = if has_builtin_node_name(&common_cow(&o.id)) {
if nodes.contains_key(&common_cow(o.node_id.id())) {
let error_func = if has_builtin_node_name(&common_cow(o.node_id.id())) {
query_node_reserved_name_err
} else {
query_node_duplicate_name_err
};
return Err(error_func(o, o.id.clone(), &query.node_meta).into());
return Err(
error_func(o, o.node_id.id().to_string(), &query.node_meta).into()
);
}

let target = o.target.clone().to_string();
let fqon = if o.module.is_empty() {
target
} else {
format!("{}::{}", o.module.join("::"), target)
};
let fqon = o.node_id.target_fqn(&o.target);

let node = NodeConfig {
id: o.id.clone(),
id: o.node_id.id().to_string(),
kind: NodeKind::Operator,
op_type: "trickle::operator".to_string(),
..NodeConfig::default()
Expand Down Expand Up @@ -482,25 +479,22 @@ impl Query {
None,
)?;
pipe_ops.insert(id, op);
nodes.insert(common_cow(&o.id), id);
nodes.insert(common_cow(o.node_id.id()), id);
outputs.push(id);
}
Stmt::Script(o) => {
if nodes.contains_key(&common_cow(&o.id)) {
let error_func = if has_builtin_node_name(&common_cow(&o.id)) {
if nodes.contains_key(&common_cow(o.node_id.id())) {
let error_func = if has_builtin_node_name(&common_cow(o.node_id.id())) {
query_node_reserved_name_err
} else {
query_node_duplicate_name_err
};
return Err(error_func(o, o.id.clone(), &query.node_meta).into());
return Err(
error_func(o, o.node_id.id().to_string(), &query.node_meta).into()
);
}

let target = o.target.clone().to_string();
let fqsn = if o.module.is_empty() {
target
} else {
format!("{}::{}", o.module.join("::"), target)
};
let fqsn = o.node_id.target_fqn(&o.target);

let stmt_srs =
srs::Stmt::try_new_from_query::<&'static str, _>(&self.0.query, |query| {
Expand All @@ -524,7 +518,7 @@ impl Query {
let that_defn = stmt_srs;

let node = NodeConfig {
id: o.id.clone(),
id: o.node_id.id().to_string(),
kind: NodeKind::Script,
label,
op_type: "trickle::script".to_string(),
Expand All @@ -542,7 +536,7 @@ impl Query {
None,
)?;
pipe_ops.insert(id, op);
nodes.insert(common_cow(&o.id), id);
nodes.insert(common_cow(o.node_id.id()), id);
outputs.push(id);
}
};
Expand Down
1 change: 1 addition & 0 deletions tremor-script/src/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub(crate) mod binary;
/// custom equality definition - checking for equivalence of different AST nodes
/// e.g. two different event paths with different metadata
pub mod eq;
mod node_id;
/// Query AST
pub mod query;
pub(crate) mod raw;
Expand Down
102 changes: 102 additions & 0 deletions tremor-script/src/ast/node_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2020-2021, The Tremor Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// Identifies a node in the AST.
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct NodeId {
/// The ID of the Node
id: String,
/// The module of the Node
module: Vec<String>,
}

impl NodeId {
/// Create a new `NodeId` from an ID and Module list.
pub fn new(id: String, module: Vec<String>) -> Self {
Self { id, module }
}

/// The node's id.
pub fn id(&self) -> &str {
self.id.as_str()
}

/// The node's module.
pub fn module(&self) -> &[String] {
&self.module
}

/// Mutate the node's module.
pub fn module_mut(&mut self) -> &mut Vec<String> {
&mut self.module
}

/// Calculate the fully qualified name from
/// the given module path.
#[must_use]
pub fn fqn(&self) -> String {
if self.module.is_empty() {
self.id.to_string()
} else {
format!("{}::{}", self.module.join("::"), self.id)
}
}

/// Calculate the fully qualified name of some
/// target identifier given this node's module
/// path.
#[must_use]
pub fn target_fqn(&self, target: &str) -> String {
if self.module.is_empty() {
target.to_string()
} else {
format!("{}::{}", self.module.join("::"), target)
}
}
}

#[doc(hidden)]
#[macro_export]
macro_rules! impl_fqn {
($struct:ident) => {
impl $struct<'_> {
fn fqn(&self) -> String {
self.node_id.fqn()
}
}
};
}

#[cfg(test)]
mod test {
use super::NodeId;

#[test]
fn fqn() {
let no_module = NodeId::new("foo".to_string(), vec![]);
assert_eq!(no_module.fqn(), "foo");
assert!(no_module.module().is_empty());

let with_module = NodeId::new(
"foo".to_string(),
vec!["bar".to_string(), "baz".to_string()],
);
assert_eq!(with_module.fqn(), "bar::baz::foo");
assert_eq!(with_module.module(), &["bar", "baz"]);

let target = "quux";
assert_eq!(no_module.target_fqn(target), target);
assert_eq!(with_module.target_fqn(target), "bar::baz::quux");
}
}
98 changes: 23 additions & 75 deletions tremor-script/src/ast/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@

pub(crate) mod raw;
use super::{
error_generic, error_no_consts, error_no_locals, AggrRegistry, EventPath, HashMap, Helper,
Ident, ImutExpr, ImutExprInt, InvokeAggrFn, Location, NodeMetas, Path, Registry, Result,
Script, Serialize, Stmts, Upable, Value,
error_generic, error_no_consts, error_no_locals, node_id::NodeId, AggrRegistry, EventPath,
HashMap, Helper, Ident, ImutExpr, ImutExprInt, InvokeAggrFn, Location, NodeMetas, Path,
Registry, Result, Script, Serialize, Stmts, Upable, Value,
};
use super::{raw::BaseExpr, Consts};
use crate::ast::eq::AstEq;
use crate::impl_expr_mid;
use crate::{ast::eq::AstEq, impl_fqn};
use raw::WindowDefnRaw;

/// A Tremor query
Expand Down Expand Up @@ -164,40 +164,27 @@ impl BaseExpr for OperatorKind {
/// An operator declaration
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct OperatorDecl<'script> {
/// The ID and Module of the Operator
pub node_id: NodeId,
/// metadata id
pub(crate) mid: usize,
/// Type of the operator
pub kind: OperatorKind,
/// Module of the operator
pub module: Vec<String>,
/// Identifer for the operator
pub id: String,
/// Parameters for the operator
pub params: Option<HashMap<String, Value<'script>>>,
}
impl_expr_mid!(OperatorDecl);

impl<'script> OperatorDecl<'script> {
/// Calculate the fully qualified name
#[must_use]
pub fn fqon(&self, module: &[String]) -> String {
if module.is_empty() {
self.id.clone()
} else {
format!("{}::{}", module.join("::"), self.id)
}
}
}
impl_fqn!(OperatorDecl);

/// An operator creation
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct OperatorStmt<'script> {
/// The ID and Module of the Operator
pub node_id: NodeId,
/// metadata id
pub(crate) mid: usize,
/// Id of the operator
pub id: String,
/// Target of the operator
pub target: String,
/// Module of the script
pub module: Vec<String>,
/// parameters of the instance
pub params: Option<HashMap<String, Value<'script>>>,
}
Expand All @@ -207,52 +194,36 @@ impl_expr_mid!(OperatorStmt);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ScriptDecl<'script> {
pub(crate) mid: usize,
/// Module of the script
pub module: Vec<String>,
/// ID of the script
pub id: String,
/// The ID and Module of the Script
pub node_id: NodeId,
/// Parameters of a script declaration
pub params: Option<HashMap<String, Value<'script>>>,
/// The script itself
pub script: Script<'script>,
}
impl_expr_mid!(ScriptDecl);

impl<'script> ScriptDecl<'script> {
/// Calculate the fully qualified name
#[must_use]
pub fn fqsn(&self, module: &[String]) -> String {
if module.is_empty() {
self.id.clone()
} else {
format!("{}::{}", module.join("::"), self.id)
}
}
}

/// A script creation
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ScriptStmt<'script> {
/// The ID and Module of the Script
pub node_id: NodeId,
/// metadata id
pub(crate) mid: usize,
/// ID of the script
pub id: String,
/// Target of the script
pub target: String,
/// Parameters of the script statement
pub params: Option<HashMap<String, Value<'script>>>,
/// Module path of the script
pub module: Vec<String>,
}
impl_expr_mid!(ScriptStmt);

/// A subquery declaration
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct SubqueryDecl<'script> {
/// The ID and Module of the SubqueryDecl
pub node_id: NodeId,
/// metadata id
pub(crate) mid: usize,
/// Module of the subquery
pub module: Vec<String>,
/// ID of the subquery
pub id: String,
/// Parameters of a subquery declaration
pub params: Option<HashMap<String, Value<'script>>>,
/// Input Ports
Expand All @@ -263,18 +234,7 @@ pub struct SubqueryDecl<'script> {
pub raw_stmts: raw::StmtsRaw<'script>,
}
impl_expr_mid!(SubqueryDecl);

impl<'script> SubqueryDecl<'script> {
/// Calculate the fully qualified name
#[must_use]
pub fn fqsqn(&self, module: &[String]) -> String {
if module.is_empty() {
self.id.clone()
} else {
format!("{}::{}", module.join("::"), self.id)
}
}
}
impl_fqn!(SubqueryDecl);

/// A subquery creation
#[derive(Clone, Debug, PartialEq, Serialize)]
Expand Down Expand Up @@ -305,12 +265,10 @@ pub enum WindowKind {
/// A window declaration
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct WindowDecl<'script> {
/// ID and Module of the Window
pub node_id: NodeId,
/// metadata id
pub(crate) mid: usize,
/// Module of the window declaration
pub module: Vec<String>,
/// Name of the window declaration
pub id: String,
/// The type of window
pub kind: WindowKind,
/// Parameters passed to the window
Expand All @@ -319,6 +277,7 @@ pub struct WindowDecl<'script> {
pub script: Option<Script<'script>>,
}
impl_expr_mid!(WindowDecl);
impl_fqn!(WindowDecl);

impl<'script> WindowDecl<'script> {
/// `emit_empty_windows` setting
Expand All @@ -329,16 +288,6 @@ impl<'script> WindowDecl<'script> {
pub const INTERVAL: &'static str = "interval";
/// `size` setting
pub const SIZE: &'static str = "size";

/// Calculate the fully qualified window name
#[must_use]
pub fn fqwn(&self, module: &[String]) -> String {
if module.is_empty() {
self.id.clone()
} else {
format!("{}::{}", module.join("::"), self.id)
}
}
}

/// A select statement
Expand All @@ -354,7 +303,6 @@ pub struct Select<'script> {
pub target: ImutExpr<'script>,
/// Where clause
pub maybe_where: Option<ImutExpr<'script>>,

/// Having clause
pub maybe_having: Option<ImutExpr<'script>>,
/// Group-By clause
Expand Down
Loading