Skip to content

Commit

Permalink
Add parser option enable_options_value_normalization (#11330)
Browse files Browse the repository at this point in the history
* draft option enable_options_value_normalization

* Add unit tests

* Fix ci

* Fix bad merge

* Update configs.md

* Fix ci 2

* Fix doc gen

* Fix comments

* Fix ut

* fix format

* fix fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
xinlifoobar and alamb committed Jul 25, 2024
1 parent 6fd57b2 commit d452d51
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 91 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ config_namespace! {
/// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
pub enable_ident_normalization: bool, default = true

/// When set to true, SQL parser will normalize options value (convert value to lowercase)
pub enable_options_value_normalization: bool, default = true

/// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
pub dialect: String, default = "generic".to_string()
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ impl SessionState {
ParserOptions {
parse_float_as_decimal: sql_parser_options.parse_float_as_decimal,
enable_ident_normalization: sql_parser_options.enable_ident_normalization,
enable_options_value_normalization: sql_parser_options
.enable_options_value_normalization,
support_varchar_with_length: sql_parser_options.support_varchar_with_length,
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/cte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Process CTEs from top to bottom
for cte in with.cte_tables {
// A `WITH` block can't use the same name more than once
let cte_name = self.normalizer.normalize(cte.alias.name.clone());
let cte_name = self.ident_normalizer.normalize(cte.alias.name.clone());
if planner_context.contains_cte(&cte_name) {
return plan_err!(
"WITH query name {cte_name:?} specified more than once"
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// interpret names with '.' as if they were
// compound identifiers, but this is not a compound
// identifier. (e.g. it is "foo.bar" not foo.bar)
let normalize_ident = self.normalizer.normalize(id);
let normalize_ident = self.ident_normalizer.normalize(id);

// Check for qualified field with unqualified name
if let Ok((qualifier, _)) =
Expand Down Expand Up @@ -96,7 +96,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if ids[0].value.starts_with('@') {
let var_names: Vec<_> = ids
.into_iter()
.map(|id| self.normalizer.normalize(id))
.map(|id| self.ident_normalizer.normalize(id))
.collect();
let ty = self
.context_provider
Expand All @@ -110,7 +110,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
} else {
let ids = ids
.into_iter()
.map(|id| self.normalizer.normalize(id))
.map(|id| self.ident_normalizer.normalize(id))
.collect::<Vec<_>>();

// Currently not supporting more than one nested level
Expand Down
58 changes: 46 additions & 12 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use arrow_schema::*;
use datafusion_common::{
field_not_found, internal_err, plan_datafusion_err, DFSchemaRef, SchemaError,
};
use sqlparser::ast::TimezoneInfo;
use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo};
use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
use sqlparser::ast::{TimezoneInfo, Value};

use datafusion_common::TableReference;
use datafusion_common::{
Expand All @@ -38,8 +38,7 @@ use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::utils::find_column_exprs;
use datafusion_expr::{col, Expr};

use crate::utils::make_decimal_type;

use crate::utils::{make_decimal_type, value_to_string};
pub use datafusion_expr::planner::ContextProvider;

/// SQL parser options
Expand All @@ -48,6 +47,7 @@ pub struct ParserOptions {
pub parse_float_as_decimal: bool,
pub enable_ident_normalization: bool,
pub support_varchar_with_length: bool,
pub enable_options_value_normalization: bool,
}

impl Default for ParserOptions {
Expand All @@ -56,6 +56,7 @@ impl Default for ParserOptions {
parse_float_as_decimal: false,
enable_ident_normalization: true,
support_varchar_with_length: true,
enable_options_value_normalization: true,
}
}
}
Expand Down Expand Up @@ -86,6 +87,32 @@ impl IdentNormalizer {
}
}

/// Value Normalizer
#[derive(Debug)]
pub struct ValueNormalizer {
normalize: bool,
}

impl Default for ValueNormalizer {
fn default() -> Self {
Self { normalize: true }
}
}

impl ValueNormalizer {
pub fn new(normalize: bool) -> Self {
Self { normalize }
}

pub fn normalize(&self, value: Value) -> Option<String> {
match (value_to_string(&value), self.normalize) {
(Some(s), true) => Some(s.to_ascii_lowercase()),
(Some(s), false) => Some(s),
(None, _) => None,
}
}
}

/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
Expand Down Expand Up @@ -184,7 +211,8 @@ impl PlannerContext {
pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) normalizer: IdentNormalizer,
pub(crate) ident_normalizer: IdentNormalizer,
pub(crate) value_normalizer: ValueNormalizer,
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
Expand All @@ -195,12 +223,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

/// Create a new query planner
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let normalize = options.enable_ident_normalization;
let ident_normalize = options.enable_ident_normalization;
let options_value_normalize = options.enable_options_value_normalization;

SqlToRel {
context_provider,
options,
normalizer: IdentNormalizer::new(normalize),
ident_normalizer: IdentNormalizer::new(ident_normalize),
value_normalizer: ValueNormalizer::new(options_value_normalize),
}
}

Expand All @@ -214,7 +244,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.any(|x| x.option == ColumnOption::NotNull);
fields.push(Field::new(
self.normalizer.normalize(column.name),
self.ident_normalizer.normalize(column.name),
data_type,
!not_nullable,
));
Expand Down Expand Up @@ -252,8 +282,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let default_expr = self
.sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
.map_err(error_desc)?;
column_defaults
.push((self.normalizer.normalize(column.name.clone()), default_expr));
column_defaults.push((
self.ident_normalizer.normalize(column.name.clone()),
default_expr,
));
}
}
Ok(column_defaults)
Expand All @@ -268,7 +300,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = self.apply_expr_alias(plan, alias.columns)?;

LogicalPlanBuilder::from(plan)
.alias(TableReference::bare(self.normalizer.normalize(alias.name)))?
.alias(TableReference::bare(
self.ident_normalizer.normalize(alias.name),
))?
.build()
}

Expand All @@ -289,7 +323,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let fields = plan.schema().fields().clone();
LogicalPlanBuilder::from(plan)
.project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
col(field.name()).alias(self.normalizer.normalize(ident))
col(field.name()).alias(self.ident_normalizer.normalize(ident))
}))?
.build()
}
Expand Down Expand Up @@ -415,7 +449,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
None => Ident::new(format!("c{idx}"))
};
Ok(Arc::new(Field::new(
self.normalizer.normalize(field_name),
self.ident_normalizer.normalize(field_name),
data_type,
true,
)))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
JoinConstraint::Using(idents) => {
let keys: Vec<Column> = idents
.into_iter()
.map(|x| Column::from_name(self.normalizer.normalize(x)))
.map(|x| Column::from_name(self.ident_normalizer.normalize(x)))
.collect();
LogicalPlanBuilder::from(left)
.join_using(right, join_type, keys)?
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
&[&[plan.schema()]],
&plan.using_columns()?,
)?;
let name = self.normalizer.normalize(alias);
let name = self.ident_normalizer.normalize(alias);
// avoiding adding an alias if the column name is the same.
let expr = match &col {
Expr::Column(column) if column.name.eq(&name) => col,
Expand Down
108 changes: 37 additions & 71 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,6 @@ fn ident_to_string(ident: &Ident) -> String {
normalize_ident(ident.to_owned())
}

fn value_to_string(value: &Value) -> Option<String> {
match value {
Value::SingleQuotedString(s) => Some(s.to_string()),
Value::DollarQuotedString(s) => Some(s.to_string()),
Value::Number(_, _) | Value::Boolean(_) => Some(value.to_string()),
Value::DoubleQuotedString(_)
| Value::EscapedStringLiteral(_)
| Value::NationalStringLiteral(_)
| Value::SingleQuotedByteStringLiteral(_)
| Value::DoubleQuotedByteStringLiteral(_)
| Value::TripleSingleQuotedString(_)
| Value::TripleDoubleQuotedString(_)
| Value::TripleSingleQuotedByteStringLiteral(_)
| Value::TripleDoubleQuotedByteStringLiteral(_)
| Value::SingleQuotedRawStringLiteral(_)
| Value::DoubleQuotedRawStringLiteral(_)
| Value::TripleSingleQuotedRawStringLiteral(_)
| Value::TripleDoubleQuotedRawStringLiteral(_)
| Value::HexStringLiteral(_)
| Value::Null
| Value::Placeholder(_) => None,
}
}

fn object_name_to_string(object_name: &ObjectName) -> String {
object_name
.0
Expand Down Expand Up @@ -881,25 +857,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
};

let mut options = HashMap::new();
for (key, value) in statement.options {
let value_string = match value_to_string(&value) {
None => {
return plan_err!("Unsupported Value in COPY statement {}", value);
}
Some(v) => v,
};

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key);
options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
} else {
options.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options_map = self.parse_options_map(statement.options, true)?;

let maybe_file_type = if let Some(stored_as) = &statement.stored_as {
if let Ok(ext_file_type) = self.context_provider.get_file_type(stored_as) {
Expand Down Expand Up @@ -946,7 +904,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
output_url: statement.target,
file_type,
partition_by,
options,
options: options_map,
}))
}

Expand Down Expand Up @@ -1007,29 +965,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let inline_constraints = calc_inline_constraints_from_columns(&columns);
all_constraints.extend(inline_constraints);

let mut options_map = HashMap::<String, String>::new();
for (key, value) in options {
if options_map.contains_key(&key) {
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = value_to_string(&value) else {
return plan_err!(
"Unsupported Value in CREATE EXTERNAL TABLE statement {}",
value
);
};

if !(&key.contains('.')) {
// If a config does not belong to any namespace, we assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key.to_lowercase());
options_map.insert(renamed_key, value_string.to_lowercase());
} else {
options_map.insert(key.to_lowercase(), value_string.to_lowercase());
}
}
let options_map = self.parse_options_map(options, false)?;

let compression = options_map
.get("format.compression")
Expand Down Expand Up @@ -1081,6 +1017,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
)))
}

fn parse_options_map(
&self,
options: Vec<(String, Value)>,
allow_duplicates: bool,
) -> Result<HashMap<String, String>> {
let mut options_map = HashMap::new();
for (key, value) in options {
if !allow_duplicates && options_map.contains_key(&key) {
return plan_err!("Option {key} is specified multiple times");
}

let Some(value_string) = self.value_normalizer.normalize(value.clone())
else {
return plan_err!("Unsupported Value {}", value);
};

if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
let renamed_key = format!("format.{}", key);
options_map.insert(renamed_key.to_lowercase(), value_string);
} else {
options_map.insert(key.to_lowercase(), value_string);
}
}

Ok(options_map)
}

/// Generate a plan for EXPLAIN ... that will print out a plan
///
/// Note this is the sqlparser explain statement, not the
Expand Down Expand Up @@ -1204,7 +1170,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// parse value string from Expr
let value_string = match &value[0] {
SQLExpr::Identifier(i) => ident_to_string(i),
SQLExpr::Value(v) => match value_to_string(v) {
SQLExpr::Value(v) => match crate::utils::value_to_string(v) {
None => {
return plan_err!("Unsupported Value {}", value[0]);
}
Expand Down Expand Up @@ -1365,8 +1331,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
None => {
// If the target table has an alias, use it to qualify the column name
if let Some(alias) = &table_alias {
Expr::Column(Column::new(
Some(self.normalizer.normalize(alias.name.clone())),
datafusion_expr::Expr::Column(Column::new(
Some(self.ident_normalizer.normalize(alias.name.clone())),
field.name(),
))
} else {
Expand Down Expand Up @@ -1421,7 +1387,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let mut value_indices = vec![None; table_schema.fields().len()];
let fields = columns
.into_iter()
.map(|c| self.normalizer.normalize(c))
.map(|c| self.ident_normalizer.normalize(c))
.enumerate()
.map(|(i, c)| {
let column_index = table_schema
Expand Down
Loading

0 comments on commit d452d51

Please sign in to comment.