diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 33dc519f9..e9aa0789d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -61,7 +61,7 @@ jobs: with: rust-version: stable - name: Run tests (excluding doctests) - run: cargo test --lib --tests --bins + run: RUST_MIN_STACK=8388608 cargo test --lib --tests --bins - name: Verify Working Directory Clean run: git diff --exit-code @@ -77,7 +77,7 @@ jobs: - name: Run tests (excluding doctests) shell: bash run: | - cargo test --lib --tests --bins + RUST_MIN_STACK=8388608 cargo test --lib --tests --bins macos: name: cargo test (macos) @@ -90,7 +90,7 @@ jobs: uses: ./.github/actions/rust/setup-macos-builder - name: Run tests (excluding doctests) shell: bash - run: cargo test --lib --tests --bins + run: RUST_MIN_STACK=8388608 cargo test --lib --tests --bins macos-aarch64: name: cargo test (macos-aarch64) @@ -103,7 +103,7 @@ jobs: uses: ./.github/actions/rust/setup-macos-aarch64-builder - name: Run tests (excluding doctests) shell: bash - run: cargo test --lib --tests --bins + run: RUST_MIN_STACK=8388608 cargo test --lib --tests --bins check-fmt: name: Check cargo fmt diff --git a/wren-core/core/src/logical_plan/analyze/plan.rs b/wren-core/core/src/logical_plan/analyze/plan.rs index 2be75dce4..c90a4c33e 100644 --- a/wren-core/core/src/logical_plan/analyze/plan.rs +++ b/wren-core/core/src/logical_plan/analyze/plan.rs @@ -7,7 +7,8 @@ use std::sync::Arc; use datafusion::arrow::datatypes::Field; use datafusion::common::{ - internal_err, plan_err, Column, DFSchema, DFSchemaRef, TableReference, + internal_datafusion_err, internal_err, plan_err, Column, DFSchema, DFSchemaRef, + TableReference, }; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::expr::WildcardOptions; @@ -198,21 +199,29 @@ impl ModelPlanNodeBuilder { } else { merge_graph(&mut self.directed_graph, column_graph)?; if self.is_contain_calculation_source(&qualified_column) { - collect_partial_model_plan( + collect_partial_model_plan_for_calculation( Arc::clone(&self.analyzed_wren_mdl), Arc::clone(&self.session_state), &qualified_column, &mut self.model_required_fields, )?; } + // Collect the column for building the partial model for the related model. + collect_partial_model_required_fields( + Arc::clone(&self.analyzed_wren_mdl), + Arc::clone(&self.session_state), + &qualified_column, + &mut self.model_required_fields, + )?; self.required_exprs_buffer .insert(OrdExpr::new(expr.clone())); - let _ = collect_model_required_fields( - &qualified_column, + // Collect the column for building the source model + collect_model_required_fields( Arc::clone(&self.analyzed_wren_mdl), Arc::clone(&self.session_state), + &qualified_column, &mut self.model_required_fields, - ); + )?; } } else { let expr_plan = get_remote_column_exp( @@ -294,7 +303,6 @@ impl ModelPlanNodeBuilder { .get(&model_ref) .map(|c| c.iter().cloned().map(|c| c.expr).collect()) .unwrap_or_default(); - let mut calculate_iter = self.required_calculation.iter(); let source_chain = if !source_required_fields.is_empty() || required_fields.is_empty() { @@ -438,7 +446,7 @@ impl ModelPlanNodeBuilder { let mut partial_model_required_fields = HashMap::new(); if self.is_contain_calculation_source(qualified_column) { - collect_partial_model_plan( + collect_partial_model_plan_for_calculation( Arc::clone(&self.analyzed_wren_mdl), Arc::clone(&self.session_state), qualified_column, @@ -446,10 +454,10 @@ impl ModelPlanNodeBuilder { )?; } - collect_model_required_fields( - qualified_column, + collect_partial_model_required_fields( Arc::clone(&self.analyzed_wren_mdl), Arc::clone(&self.session_state), + qualified_column, &mut partial_model_required_fields, )?; @@ -505,7 +513,9 @@ fn is_required_column(expr: &Expr, name: &str) -> bool { } } -fn collect_partial_model_plan( +/// Collect the fields for the calculation plan. +/// It collects the only calculated fields for the calculation plan. +fn collect_partial_model_plan_for_calculation( analyzed_wren_mdl: Arc, session_state_ref: SessionStateRef, qualified_column: &Column, @@ -547,11 +557,56 @@ fn collect_partial_model_plan( Ok(()) } -fn collect_model_required_fields( +/// Collect the required fields for the partial model used by another model throguh the relationship. +/// It collects the non-calculated fields for the he partial model used by another model. +fn collect_partial_model_required_fields( + analyzed_wren_mdl: Arc, + session_state_ref: SessionStateRef, qualified_column: &Column, + required_fields: &mut HashMap>, +) -> Result<()> { + let Some(set) = analyzed_wren_mdl + .lineage() + .required_fields_map + .get(qualified_column) + else { + return plan_err!("Required fields not found for {}", qualified_column); + }; + + for c in set { + let Some(relation_ref) = &c.relation else { + return plan_err!("Source dataset not found for {}", c); + }; + let Some(ColumnReference { dataset, column }) = + analyzed_wren_mdl.wren_mdl().get_column_reference(c) + else { + return plan_err!("Column reference not found for {}", c); + }; + if !column.is_calculated { + let expr = create_wren_expr_for_model( + &c.name, + dataset.try_as_model().ok_or_else(|| { + internal_datafusion_err!("Only support model as source dataset") + })?, + Arc::clone(&session_state_ref), + )?; + required_fields + .entry(relation_ref.clone()) + .or_default() + .insert(OrdExpr::with_column(expr, Arc::clone(&column))); + } + } + Ok(()) +} + +/// Collect the required field for the model plan. +/// It collect the calculated fields for building the calculation plan. +/// It collects the non-calculated source column for building the model source plan. +fn collect_model_required_fields( analyzed_wren_mdl: Arc, session_state_ref: SessionStateRef, - model_required_fields: &mut HashMap>, + qualified_column: &Column, + required_fields: &mut HashMap>, ) -> Result<()> { let Some(set) = analyzed_wren_mdl .lineage() @@ -591,7 +646,7 @@ fn collect_model_required_fields( } .alias(column.name.clone()); debug!("Required Calculated field: {}", &expr_plan); - model_required_fields + required_fields .entry(relation_ref.clone()) .or_default() .insert(OrdExpr::with_column(expr_plan, column)); @@ -603,7 +658,7 @@ fn collect_model_required_fields( Arc::clone(&session_state_ref), )?; debug!("Required field: {}", &expr_plan); - model_required_fields + required_fields .entry(relation_ref.clone()) .or_default() .insert(OrdExpr::with_column(expr_plan, column)); diff --git a/wren-core/core/src/logical_plan/analyze/relation_chain.rs b/wren-core/core/src/logical_plan/analyze/relation_chain.rs index 4521b0f80..8281bd1f6 100644 --- a/wren-core/core/src/logical_plan/analyze/relation_chain.rs +++ b/wren-core/core/src/logical_plan/analyze/relation_chain.rs @@ -14,10 +14,10 @@ use crate::mdl::Dataset; use crate::mdl::{AnalyzedWrenMDL, SessionStateRef}; use crate::{mdl, DataFusionError}; use datafusion::common::alias::AliasGenerator; -use datafusion::common::TableReference; use datafusion::common::{ internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef, Result, }; +use datafusion::common::{plan_datafusion_err, TableReference}; use datafusion::logical_expr::{ col, Expr, Extension, LogicalPlan, LogicalPlanBuilder, SubqueryAlias, UserDefinedLogicalNodeCore, @@ -93,37 +93,33 @@ impl RelationChain { }; match target { Dataset::Model(target_model) => { - let node = if fields.iter().any(|e| { - e.column.is_some() && e.column.clone().unwrap().is_calculated - }) { - let schema = create_schema( - fields.iter().filter_map(|e| e.column.clone()).collect(), - )?; - let plan = ModelPlanNode::new( - Arc::clone(target_model), - fields.iter().cloned().map(|c| c.expr).collect(), - None, - Arc::clone(&analyzed_wren_mdl), - Arc::clone(&session_state_ref), - Arc::clone(&properties), - )?; + let schema = create_schema( + fields + .iter() + .map(|e| { + e.column.clone().ok_or_else(|| { + plan_datafusion_err!( + "Required field {:?} has no physical column", + e.expr + ) + }) + }) + .collect::>()?, + )?; + let exprs = fields.iter().cloned().map(|c| c.expr).collect(); + let plan = ModelPlanNode::new( + Arc::clone(target_model), + exprs, + None, + Arc::clone(&analyzed_wren_mdl), + Arc::clone(&session_state_ref), + Arc::clone(&properties), + )?; - let df_schema = - DFSchemaRef::from(DFSchema::try_from(schema).unwrap()); - LogicalPlan::Extension(Extension { - node: Arc::new(PartialModelPlanNode::new(plan, df_schema)), - }) - } else { - LogicalPlan::Extension(Extension { - node: Arc::new(ModelSourceNode::new( - Arc::clone(target_model), - fields.iter().cloned().map(|c| c.expr).collect(), - Arc::clone(&analyzed_wren_mdl), - Arc::clone(&session_state_ref), - None, - )?), - }) - }; + let df_schema = DFSchemaRef::from(DFSchema::try_from(schema)?); + let node = LogicalPlan::Extension(Extension { + node: Arc::new(PartialModelPlanNode::new(plan, df_schema)), + }); relation_chain = RelationChain::Chain( node, link.join_type, diff --git a/wren-core/core/src/mdl/mod.rs b/wren-core/core/src/mdl/mod.rs index 74f09fb9d..8230ba220 100644 --- a/wren-core/core/src/mdl/mod.rs +++ b/wren-core/core/src/mdl/mod.rs @@ -574,13 +574,7 @@ mod test { sql, ) .await?; - assert_snapshot!(result, @"SELECT \"profile\".totalcost FROM (SELECT totalcost.totalcost FROM \ - (SELECT __relation__2.p_custkey AS p_custkey, sum(CAST(__relation__2.o_totalprice AS BIGINT)) AS totalcost FROM \ - (SELECT __relation__1.c_custkey, orders.o_custkey, orders.o_totalprice, __relation__1.p_custkey FROM \ - (SELECT __source.o_custkey AS o_custkey, __source.o_totalprice AS o_totalprice FROM orders AS __source) AS orders RIGHT JOIN \ - (SELECT customer.c_custkey, \"profile\".p_custkey FROM (SELECT __source.c_custkey AS c_custkey FROM customer AS __source) AS customer RIGHT JOIN \ - (SELECT __source.p_custkey AS p_custkey FROM \"profile\" AS __source) AS \"profile\" ON customer.c_custkey = \"profile\".p_custkey) AS __relation__1 \ - ON orders.o_custkey = __relation__1.c_custkey) AS __relation__2 GROUP BY __relation__2.p_custkey) AS totalcost) AS \"profile\""); + assert_snapshot!(result, @r#"SELECT "profile".totalcost FROM (SELECT totalcost.totalcost FROM (SELECT __relation__2.p_custkey AS p_custkey, sum(CAST(__relation__2.o_totalprice AS BIGINT)) AS totalcost FROM (SELECT __relation__1.c_custkey, orders.o_custkey, orders.o_totalprice, __relation__1.p_custkey FROM (SELECT orders.o_custkey, orders.o_totalprice FROM (SELECT orders.o_custkey, orders.o_totalprice FROM (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey, __source.o_totalprice AS o_totalprice FROM orders AS __source) AS orders) AS orders) AS orders RIGHT JOIN (SELECT customer.c_custkey, "profile".p_custkey FROM (SELECT customer.c_custkey FROM (SELECT customer.c_custkey FROM (SELECT __source.c_custkey AS c_custkey FROM customer AS __source) AS customer) AS customer) AS customer RIGHT JOIN (SELECT __source.p_custkey AS p_custkey FROM "profile" AS __source) AS "profile" ON customer.c_custkey = "profile".p_custkey) AS __relation__1 ON orders.o_custkey = __relation__1.c_custkey) AS __relation__2 GROUP BY __relation__2.p_custkey) AS totalcost) AS "profile""#); let sql = "select totalcost from profile where p_sex = 'M'"; let result = transform_sql_with_ctx( @@ -592,16 +586,7 @@ mod test { ) .await?; assert_snapshot!(result, - @"SELECT \"profile\".totalcost FROM (SELECT __relation__1.p_sex, __relation__1.totalcost FROM \ - (SELECT totalcost.p_custkey, \"profile\".p_sex, totalcost.totalcost FROM (SELECT __relation__2.p_custkey AS p_custkey, \ - sum(CAST(__relation__2.o_totalprice AS BIGINT)) AS totalcost FROM (SELECT __relation__1.c_custkey, orders.o_custkey, \ - orders.o_totalprice, __relation__1.p_custkey FROM (SELECT __source.o_custkey AS o_custkey, __source.o_totalprice AS o_totalprice \ - FROM orders AS __source) AS orders RIGHT JOIN (SELECT customer.c_custkey, \"profile\".p_custkey FROM \ - (SELECT __source.c_custkey AS c_custkey FROM customer AS __source) AS customer RIGHT JOIN \ - (SELECT __source.p_custkey AS p_custkey FROM \"profile\" AS __source) AS \"profile\" ON customer.c_custkey = \"profile\".p_custkey) AS __relation__1 \ - ON orders.o_custkey = __relation__1.c_custkey) AS __relation__2 GROUP BY __relation__2.p_custkey) AS totalcost RIGHT JOIN \ - (SELECT __source.p_custkey AS p_custkey, __source.p_sex AS p_sex FROM \"profile\" AS __source) AS \"profile\" \ - ON totalcost.p_custkey = \"profile\".p_custkey) AS __relation__1) AS \"profile\" WHERE \"profile\".p_sex = 'M'"); + @r#"SELECT "profile".totalcost FROM (SELECT __relation__1.p_sex, __relation__1.totalcost FROM (SELECT totalcost.p_custkey, "profile".p_sex, totalcost.totalcost FROM (SELECT __relation__2.p_custkey AS p_custkey, sum(CAST(__relation__2.o_totalprice AS BIGINT)) AS totalcost FROM (SELECT __relation__1.c_custkey, orders.o_custkey, orders.o_totalprice, __relation__1.p_custkey FROM (SELECT orders.o_custkey, orders.o_totalprice FROM (SELECT orders.o_custkey, orders.o_totalprice FROM (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey, __source.o_totalprice AS o_totalprice FROM orders AS __source) AS orders) AS orders) AS orders RIGHT JOIN (SELECT customer.c_custkey, "profile".p_custkey FROM (SELECT customer.c_custkey FROM (SELECT customer.c_custkey FROM (SELECT __source.c_custkey AS c_custkey FROM customer AS __source) AS customer) AS customer) AS customer RIGHT JOIN (SELECT __source.p_custkey AS p_custkey FROM "profile" AS __source) AS "profile" ON customer.c_custkey = "profile".p_custkey) AS __relation__1 ON orders.o_custkey = __relation__1.c_custkey) AS __relation__2 GROUP BY __relation__2.p_custkey) AS totalcost RIGHT JOIN (SELECT __source.p_custkey AS p_custkey, __source.p_sex AS p_sex FROM "profile" AS __source) AS "profile" ON totalcost.p_custkey = "profile".p_custkey) AS __relation__1) AS "profile" WHERE "profile".p_sex = 'M'"#); Ok(()) } @@ -2097,71 +2082,101 @@ mod test { let sql = "SELECT * FROM orders"; assert_snapshot!( transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], headers.clone(), sql).await?, - @"SELECT orders.o_orderkey, orders.o_custkey, orders.customer_name FROM (SELECT __relation__1.c_name AS customer_name, __relation__1.o_custkey, __relation__1.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name, orders.o_custkey, orders.o_orderkey FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name FROM customer AS __source) AS customer RIGHT JOIN (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey FROM orders AS __source) AS orders ON customer.c_custkey = orders.o_custkey) AS __relation__1) AS orders WHERE orders.customer_name = 'Gura'" + @"SELECT orders.o_orderkey, orders.o_custkey, orders.customer_name FROM (SELECT __relation__1.c_name AS customer_name, __relation__1.o_custkey, __relation__1.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name, orders.o_custkey, orders.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name FROM customer AS __source) AS customer) AS customer) AS customer RIGHT JOIN (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey FROM orders AS __source) AS orders ON customer.c_custkey = orders.o_custkey) AS __relation__1) AS orders WHERE orders.customer_name = 'Gura'" ); let sql = "SELECT * FROM orders where o_orderkey > 10"; assert_snapshot!( transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], headers, sql).await?, - @"SELECT orders.o_orderkey, orders.o_custkey, orders.customer_name FROM (SELECT __relation__1.c_name AS customer_name, __relation__1.o_custkey, __relation__1.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name, orders.o_custkey, orders.o_orderkey FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name FROM customer AS __source) AS customer RIGHT JOIN (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey FROM orders AS __source) AS orders ON customer.c_custkey = orders.o_custkey) AS __relation__1) AS orders WHERE orders.o_orderkey > 10 AND orders.customer_name = 'Gura'" + @"SELECT orders.o_orderkey, orders.o_custkey, orders.customer_name FROM (SELECT __relation__1.c_name AS customer_name, __relation__1.o_custkey, __relation__1.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name, orders.o_custkey, orders.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name FROM customer AS __source) AS customer) AS customer) AS customer RIGHT JOIN (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey FROM orders AS __source) AS orders ON customer.c_custkey = orders.o_custkey) AS __relation__1) AS orders WHERE orders.o_orderkey > 10 AND orders.customer_name = 'Gura'" ); // TODO: the rlac rule should be applied for the model used by the calculated field // both to_one or to_many relationship should be supported // - // let manifest = ManifestBuilder::new() - // .catalog("wren") - // .schema("test") - // .model( - // ModelBuilder::new("customer") - // .table_reference("customer") - // .column(ColumnBuilder::new("c_custkey", "int").build()) - // .column(ColumnBuilder::new("c_nationkey", "int").build()) - // .column(ColumnBuilder::new("c_name", "string").build()) - // .primary_key("c_custkey") - // .add_row_level_access_control( - // "nation rule", - // vec![SessionProperty::new_optional("session_nation", None)], - // "c_nationkey = @session_nation", - // ) - // .build(), - // ) - // .model( - // ModelBuilder::new("orders") - // .table_reference("orders") - // .column(ColumnBuilder::new("o_orderkey", "int").build()) - // .column(ColumnBuilder::new("o_custkey", "int").build()) - // .column( - // ColumnBuilder::new("customer", "customer") - // .relationship("customer_orders") - // .build(), - // ) - // .column( - // ColumnBuilder::new("customer_name", "string") - // .calculated(true) - // .expression("customer.c_name") - // .build(), - // ) - // .primary_key("o_orderkey") - // .build(), - // ) - // .relationship( - // RelationshipBuilder::new("customer_orders") - // .model("customer") - // .model("orders") - // .join_type(JoinType::OneToMany) - // .condition("customer.c_custkey = orders.o_custkey") - // .build(), - // ) - // .build(); - // let analyzed_mdl = Arc::new(AnalyzedWrenMDL::analyze(manifest)?); - // let headers = - // build_headers(&[("session_nation".to_string(), Some("1".to_string()))]); - // let sql = "SELECT customer_name FROM orders"; - // assert_snapshot!( - // transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], headers, sql).await?, - // @"SELECT orders.customer_name FROM (SELECT __relation__1.c_name AS customer_name FROM (SELECT customer.c_custkey, customer.c_name, orders.o_custkey, orders.o_orderkey FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name FROM customer AS __source) AS customer RIGHT JOIN (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey FROM orders AS __source) AS orders ON customer.c_custkey = orders.o_custkey) AS __relation__1) AS orders" - // ); + let manifest = ManifestBuilder::new() + .catalog("wren") + .schema("test") + .model( + ModelBuilder::new("customer") + .table_reference("customer") + .column(ColumnBuilder::new("c_custkey", "int").build()) + .column(ColumnBuilder::new("c_nationkey", "int").build()) + .column(ColumnBuilder::new("c_name", "string").build()) + .column( + ColumnBuilder::new_relationship( + "orders", + "orders", + "customer_orders", + ) + .build(), + ) + .column( + ColumnBuilder::new_calculated("totalprice", "int") + .expression("sum(orders.o_totalprice)") + .build(), + ) + .primary_key("c_custkey") + .add_row_level_access_control( + "nation rule", + vec![SessionProperty::new_optional("session_nation", None)], + "c_nationkey = @session_nation", + ) + .build(), + ) + .model( + ModelBuilder::new("orders") + .table_reference("orders") + .column(ColumnBuilder::new("o_orderkey", "int").build()) + .column(ColumnBuilder::new("o_custkey", "int").build()) + .column(ColumnBuilder::new("o_totalprice", "int").build()) + .column( + ColumnBuilder::new_relationship( + "customer", + "customer", + "customer_orders", + ) + .build(), + ) + .column( + ColumnBuilder::new_calculated("customer_name", "string") + .expression("customer.c_name") + .build(), + ) + .primary_key("o_orderkey") + .add_row_level_access_control( + "user rule", + vec![SessionProperty::new_optional("session_user", None)], + "o_custkey = @session_user", + ) + .build(), + ) + .relationship( + RelationshipBuilder::new("customer_orders") + .model("customer") + .model("orders") + .join_type(JoinType::OneToMany) + .condition("customer.c_custkey = orders.o_custkey") + .build(), + ) + .build(); + let analyzed_mdl = Arc::new(AnalyzedWrenMDL::analyze(manifest)?); + let headers = + build_headers(&[("session_nation".to_string(), Some("1".to_string()))]); + let sql = "SELECT customer_name FROM orders"; + // test custoer model used by customer_name should be filtered by nation rule. + assert_snapshot!( + transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], headers, sql).await?, + @"SELECT orders.customer_name FROM (SELECT __relation__1.c_name AS customer_name FROM (SELECT customer.c_custkey, customer.c_name, orders.o_custkey, orders.o_orderkey FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT customer.c_custkey, customer.c_name, customer.c_nationkey FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name, __source.c_nationkey AS c_nationkey FROM customer AS __source) AS customer) AS customer WHERE customer.c_nationkey = 1) AS customer RIGHT JOIN (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey FROM orders AS __source) AS orders ON customer.c_custkey = orders.o_custkey) AS __relation__1) AS orders" + ); + let headers = + build_headers(&[("session_user".to_string(), Some("1".to_string()))]); + let sql = "SELECT totalprice FROM customer"; + // test orders model used by totalprice should be filtered by user rule. + assert_snapshot!( + transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], headers, sql).await?, + @"SELECT customer.totalprice FROM (SELECT totalprice.totalprice FROM (SELECT __relation__1.c_custkey AS c_custkey, sum(CAST(__relation__1.o_totalprice AS BIGINT)) AS totalprice FROM (SELECT customer.c_custkey, orders.o_custkey, orders.o_totalprice FROM (SELECT orders.o_custkey, orders.o_totalprice FROM (SELECT orders.o_custkey, orders.o_totalprice FROM (SELECT __source.o_custkey AS o_custkey, __source.o_orderkey AS o_orderkey, __source.o_totalprice AS o_totalprice FROM orders AS __source) AS orders) AS orders WHERE orders.o_custkey = 1) AS orders RIGHT JOIN (SELECT __source.c_custkey AS c_custkey FROM customer AS __source) AS customer ON orders.o_custkey = customer.c_custkey) AS __relation__1 GROUP BY __relation__1.c_custkey) AS totalprice) AS customer", + ); Ok(()) }