Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ContextProvider naming: rename get_table_provider --> get_table_source, deprecate get_table_provider #7831

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ struct MyContextProvider {
}

impl ContextProvider for MyContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
if name.table() == "person" {
Ok(Arc::new(MyTableSource {
schema: Arc::new(Schema::new(vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ struct SessionContextProvider<'a> {
}

impl<'a> ContextProvider for SessionContextProvider<'a> {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let name = self.state.resolve_table_ref(name).to_string();
self.tables
.get(&name)
Expand Down
10 changes: 5 additions & 5 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let statement = &ast[0];

// create a logical query plan
let schema_provider = MySchemaProvider::default();
let sql_to_rel = SqlToRel::new(&schema_provider);
let context_provider = MyContextProvider::default();
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// hard code the return value of now()
Expand All @@ -357,12 +357,12 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
}

#[derive(Default)]
struct MySchemaProvider {
struct MyContextProvider {
options: ConfigOptions,
}

impl ContextProvider for MySchemaProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
impl ContextProvider for MyContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let table_name = name.table();
if table_name.starts_with("test") {
let schema = Schema::new_with_metadata(
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ fn main() {
let statement = &ast[0];

// create a logical query plan
let schema_provider = MySchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);
let context_provider = MyContextProvider::new();
let sql_to_rel = SqlToRel::new(&context_provider);
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// show the plan
println!("{plan:?}");
}

struct MySchemaProvider {
struct MyContextProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

impl MySchemaProvider {
impl MyContextProvider {
fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
Expand Down Expand Up @@ -104,8 +104,8 @@ fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
)))
}

impl ContextProvider for MySchemaProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
impl ContextProvider for MyContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => plan_err!("Table not found: {}", name.table()),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};

// user-defined function (UDF) should have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.schema_provider.get_function_meta(&name) {
if let Some(fm) = self.context_provider.get_function_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
return Ok(Expr::ScalarUDF(ScalarUDF::new(fm, args)));
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
} else {
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.schema_provider.get_aggregate_meta(&name) {
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
return Ok(Expr::AggregateUDF(expr::AggregateUDF::new(
Expand Down Expand Up @@ -178,13 +178,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
window_function::find_df_window_func(name)
// next check user defined aggregates
.or_else(|| {
self.schema_provider
self.context_provider
.get_aggregate_meta(name)
.map(WindowFunction::AggregateUDF)
})
// next check user defined window functions
.or_else(|| {
self.schema_provider
self.context_provider
.get_window_meta(name)
.map(WindowFunction::WindowUDF)
})
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// TODO: figure out if ScalarVariables should be insensitive.
let var_names = vec![id.value];
let ty = self
.schema_provider
.context_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
DataFusionError::Plan(format!(
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.map(|id| self.normalizer.normalize(id))
.collect();
let ty = self
.schema_provider
.context_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
DataFusionError::Execution(format!(
Expand Down
15 changes: 6 additions & 9 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,12 +772,12 @@ mod tests {

use crate::TableReference;

struct TestSchemaProvider {
struct TestContextProvider {
options: ConfigOptions,
tables: HashMap<String, Arc<dyn TableSource>>,
}

impl TestSchemaProvider {
impl TestContextProvider {
pub fn new() -> Self {
let mut tables = HashMap::new();
tables.insert(
Expand All @@ -796,11 +796,8 @@ mod tests {
}
}

impl ContextProvider for TestSchemaProvider {
fn get_table_provider(
&self,
name: TableReference,
) -> Result<Arc<dyn TableSource>> {
impl ContextProvider for TestContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => plan_err!("Table not found: {}", name.table()),
Expand Down Expand Up @@ -853,8 +850,8 @@ mod tests {
.unwrap();
let sql_expr = parser.parse_expr().unwrap();

let schema_provider = TestSchemaProvider::new();
let sql_to_rel = SqlToRel::new(&schema_provider);
let context_provider = TestContextProvider::new();
let sql_to_rel = SqlToRel::new(&context_provider);

// Should not stack overflow
sql_to_rel.sql_expr_to_logical_expr(
Expand Down
18 changes: 11 additions & 7 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ use crate::utils::make_decimal_type;
/// The ContextProvider trait allows the query planner to obtain meta-data about tables and
/// functions referenced in SQL statements
pub trait ContextProvider {
#[deprecated(since = "32.0.0", note = "please use `get_table_source` instead")]
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
self.get_table_source(name)
}
/// Getter for a datasource
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>>;
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>>;
lewiszlw marked this conversation as resolved.
Show resolved Hide resolved
/// Getter for a UDF description
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
/// Getter for a UDAF description
Expand Down Expand Up @@ -186,22 +190,22 @@ impl PlannerContext {

/// SQL query planner
pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) schema_provider: &'a S,
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) normalizer: IdentNormalizer,
}

impl<'a, S: ContextProvider> SqlToRel<'a, S> {
/// Create a new query planner
pub fn new(schema_provider: &'a S) -> Self {
Self::new_with_options(schema_provider, ParserOptions::default())
pub fn new(context_provider: &'a S) -> Self {
Self::new_with_options(context_provider, ParserOptions::default())
}

/// Create a new query planner
pub fn new_with_options(schema_provider: &'a S, options: ParserOptions) -> Self {
pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
let normalize = options.enable_ident_normalization;
SqlToRel {
schema_provider,
context_provider,
options,
normalizer: IdentNormalizer::new(normalize),
}
Expand Down Expand Up @@ -334,7 +338,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Timestamp With Time Zone
// INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
self.schema_provider.options().execution.time_zone.clone()
self.context_provider.options().execution.time_zone.clone()
} else {
// Timestamp Without Time zone
None
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
(
match (
cte,
self.schema_provider.get_table_provider(table_ref.clone()),
self.context_provider.get_table_source(table_ref.clone()),
) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Ok(provider)) => {
Expand Down
37 changes: 18 additions & 19 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let DescribeTableStmt { table_name } = statement;
let table_ref = self.object_name_to_table_reference(table_name)?;

let table_source = self.schema_provider.get_table_provider(table_ref)?;
let table_source = self.context_provider.get_table_source(table_ref)?;

let schema = table_source.schema();

Expand All @@ -630,7 +630,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
CopyToSource::Relation(object_name) => {
let table_ref =
self.object_name_to_table_reference(object_name.clone())?;
let table_source = self.schema_provider.get_table_provider(table_ref)?;
let table_source = self.context_provider.get_table_source(table_ref)?;
LogicalPlanBuilder::scan(
object_name_to_string(&object_name),
table_source,
Expand Down Expand Up @@ -912,12 +912,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(table_name.clone())?;
let provider = self.schema_provider.get_table_provider(table_ref.clone())?;
let schema = (*provider.schema()).clone();
let table_source = self.context_provider.get_table_source(table_ref.clone())?;
let schema = (*table_source.schema()).clone();
let schema = DFSchema::try_from(schema)?;
let scan =
LogicalPlanBuilder::scan(object_name_to_string(&table_name), provider, None)?
.build()?;
let scan = LogicalPlanBuilder::scan(
object_name_to_string(&table_name),
table_source,
None,
)?
.build()?;
let mut planner_context = PlannerContext::new();

let source = match predicate_expr {
Expand Down Expand Up @@ -960,10 +963,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
.get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let arrow_schema = (*table_source.schema()).clone();
let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
table_name.clone(),
&arrow_schema,
Expand Down Expand Up @@ -1064,10 +1065,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
.get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let arrow_schema = (*table_source.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;

// Get insert fields and index_mapping
Expand Down Expand Up @@ -1193,7 +1192,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
let _ = self.schema_provider.get_table_provider(table_ref)?;
let _ = self.context_provider.get_table_source(table_ref)?;

// treat both FULL and EXTENDED as the same
let select_list = if full || extended {
Expand Down Expand Up @@ -1228,7 +1227,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
let _ = self.schema_provider.get_table_provider(table_ref)?;
let _ = self.context_provider.get_table_source(table_ref)?;

let query = format!(
"SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
Expand All @@ -1245,8 +1244,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: schema.into(),
table: table.into(),
};
self.schema_provider
.get_table_provider(tables_reference)
self.context_provider
.get_table_source(tables_reference)
.is_ok()
}
}
2 changes: 1 addition & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2691,7 +2691,7 @@ struct MockContextProvider {
}

impl ContextProvider for MockContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let schema = match name.table() {
"test" => Ok(Schema::new(vec![
Field::new("t_date32", DataType::Date32, false),
Expand Down
Loading