Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions wren-core-base/src/mdl/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,19 @@ impl Model {
/// Physical columns are columns that can be selected from the model.
/// All physical columns are visible columns, but not all visible columns are physical columns
/// e.g. columns that are not a relationship column
pub fn get_physical_columns(&self) -> Vec<Arc<Column>> {
self.get_visible_columns()
.filter(|c| c.relationship.is_none())
.map(|c| Arc::clone(&c))
.collect()
pub fn get_physical_columns(&self, show_visible_only: bool) -> Vec<Arc<Column>> {
if show_visible_only {
self.get_visible_columns()
.filter(|c| c.relationship.is_none())
.map(|c| Arc::clone(&c))
.collect()
} else {
self.columns
.iter()
.filter(|c| c.relationship.is_none())
.map(|c| Arc::clone(&c))
.collect()
}
}

/// Return the name of the model
Expand All @@ -267,12 +275,19 @@ impl Model {
}

/// Get the specified visible column by name
pub fn get_column(&self, column_name: &str) -> Option<Arc<Column>> {
pub fn get_visible_column(&self, column_name: &str) -> Option<Arc<Column>> {
self.get_visible_columns()
.find(|c| c.name == column_name)
.map(|c| Arc::clone(&c))
}

pub fn get_column(&self, column_name: &str) -> Option<Arc<Column>> {
self.columns
.iter()
.find(|c| c.name == column_name)
.map(|c| Arc::clone(&c))
}

/// Return the primary key of the model
pub fn primary_key(&self) -> Option<&str> {
self.primary_key.as_deref()
Expand Down
9 changes: 7 additions & 2 deletions wren-core/core/src/logical_plan/analyze/access_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ pub fn build_filter_expression(
if let Some(error) = error {
return error;
}
let df_schema = Dataset::Model(Arc::clone(&model)).to_qualified_schema()?;
// The condition could contains the hidden columns, so we need to build the shcmea with hidden columns
let df_schema = Dataset::Model(Arc::clone(&model)).to_qualified_schema(false)?;
session_state
.read()
.create_logical_expr(&expr.to_string(), &df_schema)
Expand Down Expand Up @@ -253,6 +254,10 @@ pub fn validate_rule(
required_properties: &[SessionProperty],
headers: &HashMap<String, Option<String>>,
) -> Result<bool> {
if required_properties.is_empty() {
return Ok(true);
}

let exists = required_properties
.iter()
.map(|property| {
Expand Down Expand Up @@ -336,7 +341,7 @@ pub(crate) fn validate_clac_rule(
else {
return plan_err!("Model {} not found", model_name.table());
};
let Some(ref_column) = ref_model.get_column(field.name()) else {
let Some(ref_column) = ref_model.get_visible_column(field.name()) else {
return plan_err!(
"Column {}.{} not found",
model_name.table(),
Expand Down
42 changes: 23 additions & 19 deletions wren-core/core/src/logical_plan/analyze/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,16 @@ impl ModelPlanNodeBuilder {
let required_fields =
self.add_required_columns_from_session_properties(&model, required_fields)?;

// `required_fields` could contain the hidden columns, so we need to get from all physical columns.
let required_columns =
model.get_physical_columns().into_iter().filter(|column| {
required_fields
.iter()
.any(|expr| is_required_column(expr, column.name()))
});
model
.get_physical_columns(false)
.into_iter()
.filter(|column| {
required_fields
.iter()
.any(|expr| is_required_column(expr, column.name()))
});
for column in required_columns {
// Actually, it's only be checked in PermissionAnalyze mode.
// In Unparse or LocalRuntime mode, an invalid column won't be registered in the table provider.
Expand Down Expand Up @@ -234,8 +238,9 @@ impl ModelPlanNodeBuilder {
self.required_calculation.push(calculation);
// insert the primary key to the required fields for join with the calculation

let Some(pk_column) =
model.primary_key().and_then(|pk| model.get_column(pk))
let Some(pk_column) = model
.primary_key()
.and_then(|pk| model.get_visible_column(pk))
else {
return plan_err!(
"Primary key not found for model {}. To use `TO_MANY` relationship, the primary key is required for the base model.",
Expand Down Expand Up @@ -916,7 +921,7 @@ impl ModelSourceNode {
} else {
Arc::clone(&model)
};
for column in model.get_physical_columns().into_iter() {
for column in model.get_physical_columns(false).into_iter() {
// skip the calculated field
if column.is_calculated {
continue;
Expand All @@ -938,16 +943,13 @@ impl ModelSourceNode {
)?));
}
} else {
let Some(column) =
model
.get_physical_columns()
.into_iter()
.find(|column| match expr {
Expr::Column(c) => c.name.as_str() == column.name(),
Expr::Alias(alias) => alias.name.as_str() == column.name(),
_ => false,
})
else {
let Some(column) = model.get_physical_columns(false).into_iter().find(
|column| match expr {
Expr::Column(c) => c.name.as_str() == column.name(),
Expr::Alias(alias) => alias.name.as_str() == column.name(),
_ => false,
},
) else {
return plan_err!("Field not found {}", expr);
};
if column.is_calculated {
Expand Down Expand Up @@ -1052,7 +1054,9 @@ impl CalculationPlanNode {
let Some(model) = calculation.dataset.try_as_model() else {
return plan_err!("Only support model as source dataset");
};
let Some(pk_column) = model.primary_key().and_then(|pk| model.get_column(pk))
let Some(pk_column) = model
.primary_key()
.and_then(|pk| model.get_visible_column(pk))
else {
return plan_err!("Primary key not found");
};
Expand Down
2 changes: 1 addition & 1 deletion wren-core/core/src/mdl/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl WrenDataSource {
mode: &Mode,
) -> Result<Self> {
let available_columns = model
.get_physical_columns()
.get_physical_columns(true)
.iter()
.map(|column| {
if mode.is_permission_analyze()
Expand Down
6 changes: 3 additions & 3 deletions wren-core/core/src/mdl/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ impl Dataset {
}
}

pub fn to_qualified_schema(&self) -> Result<DFSchema> {
pub fn to_qualified_schema(&self, show_visible_only: bool) -> Result<DFSchema> {
match self {
Dataset::Model(model) => {
let fields: Vec<_> = model
.get_physical_columns()
.get_physical_columns(show_visible_only)
.iter()
.map(|c| to_field(c))
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -60,7 +60,7 @@ impl Dataset {
DFSchema::try_from_qualified_schema(model.table_reference(), &schema)
} else {
let fields: Vec<Field> = model
.get_physical_columns()
.get_physical_columns(true)
.iter()
.filter(|c| !c.is_calculated)
.map(|c| to_remote_field(c, Arc::clone(&session_state)))
Expand Down
48 changes: 48 additions & 0 deletions wren-core/core/src/mdl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2644,6 +2644,54 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_ralc_condition_contain_hidden() -> Result<()> {
let ctx = SessionContext::new();

let manifest = ManifestBuilder::new()
.catalog("wren")
.schema("test")
.model(
ModelBuilder::new("customer")
.table_reference("customer")
.column(ColumnBuilder::new("c_custkey", "int").build())
.column(ColumnBuilder::new("c_name", "string").hidden(true).build())
.add_row_level_access_control(
"hidden condition",
vec![],
"c_name = 'Peko'",
)
.build(),
)
.build();

let headers = SessionPropertiesRef::default();
let analyzed_mdl = Arc::new(AnalyzedWrenMDL::analyze(
manifest.clone(),
headers.clone(),
Mode::Unparse,
)?);
let sql = "SELECT * FROM customer";

assert_snapshot!(
transform_sql_with_ctx(&ctx, Arc::clone(&analyzed_mdl), &[], Arc::clone(&headers), sql).await?,
@"SELECT customer.c_custkey FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT customer.c_custkey, customer.c_name FROM (SELECT __source.c_custkey AS c_custkey, __source.c_name AS c_name FROM customer AS __source) AS customer) AS customer WHERE customer.c_name = 'Peko') AS customer"
);

// assert the hidden column can't be used directly
let sql = "SELECT c_name FROm customer";
match transform_sql_with_ctx(&ctx, analyzed_mdl, &[], headers, sql).await {
Ok(_) => panic!("Expected error"),
Err(e) => {
assert_snapshot!(
e.to_string(),
@"Schema error: No field named c_name. Valid fields are customer.c_custkey."
)
}
}
Ok(())
}

#[tokio::test]
async fn test_clac_with_required_properties() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
4 changes: 2 additions & 2 deletions wren-core/core/src/mdl/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub fn create_wren_calculated_field_expr(
.map(|m| analyzed_wren_mdl.wren_mdl().get_model(&m))
.filter(|m| m.is_some())
.map(|m| Dataset::Model(m.unwrap()))
.map(|m| m.to_qualified_schema())
.map(|m| m.to_qualified_schema(true))
.reduce(|acc, schema| acc?.join(&schema?))
.transpose()?
else {
Expand Down Expand Up @@ -163,7 +163,7 @@ pub(crate) fn create_wren_expr_for_model(
session_state: SessionStateRef,
) -> Result<Expr> {
let dataset = Dataset::Model(model);
let schema = dataset.to_qualified_schema()?;
let schema = dataset.to_qualified_schema(true)?;
let session_state = session_state.read();
session_state.create_logical_expr(
qualified_expr(expr, &schema, &session_state)?.as_str(),
Expand Down