Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ContextProvider naming: rename get_table_provider --> get_table_source, deprecate get_table_provider #7831

Merged
merged 4 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ struct MyContextProvider {
}

impl ContextProvider for MyContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
if name.table() == "person" {
Ok(Arc::new(MyTableSource {
schema: Arc::new(Schema::new(vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2017,7 +2017,7 @@ struct SessionContextProvider<'a> {
}

impl<'a> ContextProvider for SessionContextProvider<'a> {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let name = self.state.resolve_table_ref(name).to_string();
self.tables
.get(&name)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ struct MySchemaProvider {
}

impl ContextProvider for MySchemaProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let table_name = name.table();
if table_name.starts_with("test") {
let schema = Schema::new_with_metadata(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/examples/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn create_table_source(fields: Vec<Field>) -> Arc<dyn TableSource> {
}

impl ContextProvider for MySchemaProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => plan_err!("Table not found: {}", name.table()),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sql/src/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};

// user-defined function (UDF) should have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.schema_provider.get_function_meta(&name) {
if let Some(fm) = self.context_provider.get_function_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
return Ok(Expr::ScalarUDF(ScalarUDF::new(fm, args)));
Expand Down Expand Up @@ -125,7 +125,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
} else {
// User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
if let Some(fm) = self.schema_provider.get_aggregate_meta(&name) {
if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
let args =
self.function_args_to_expr(function.args, schema, planner_context)?;
return Ok(Expr::AggregateUDF(expr::AggregateUDF::new(
Expand Down Expand Up @@ -178,13 +178,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
window_function::find_df_window_func(name)
// next check user defined aggregates
.or_else(|| {
self.schema_provider
self.context_provider
.get_aggregate_meta(name)
.map(WindowFunction::AggregateUDF)
})
// next check user defined window functions
.or_else(|| {
self.schema_provider
self.context_provider
.get_window_meta(name)
.map(WindowFunction::WindowUDF)
})
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// TODO: figure out if ScalarVariables should be insensitive.
let var_names = vec![id.value];
let ty = self
.schema_provider
.context_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
DataFusionError::Plan(format!(
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.map(|id| self.normalizer.normalize(id))
.collect();
let ty = self
.schema_provider
.context_provider
.get_variable_type(&var_names)
.ok_or_else(|| {
DataFusionError::Execution(format!(
Expand Down
5 changes: 1 addition & 4 deletions datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,7 @@ mod tests {
}

impl ContextProvider for TestSchemaProvider {
fn get_table_provider(
&self,
name: TableReference,
) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
match self.tables.get(name.table()) {
Some(table) => Ok(table.clone()),
_ => plan_err!("Table not found: {}", name.table()),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::utils::make_decimal_type;
/// functions referenced in SQL statements
pub trait ContextProvider {
/// Getter for a datasource
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>>;
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>>;
lewiszlw marked this conversation as resolved.
Show resolved Hide resolved
/// Getter for a UDF description
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
/// Getter for a UDAF description
Expand Down Expand Up @@ -186,7 +186,7 @@ impl PlannerContext {

/// SQL query planner
pub struct SqlToRel<'a, S: ContextProvider> {
pub(crate) schema_provider: &'a S,
pub(crate) context_provider: &'a S,
pub(crate) options: ParserOptions,
pub(crate) normalizer: IdentNormalizer,
}
Expand All @@ -201,7 +201,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
pub fn new_with_options(schema_provider: &'a S, options: ParserOptions) -> Self {
let normalize = options.enable_ident_normalization;
SqlToRel {
schema_provider,
context_provider: schema_provider,
options,
normalizer: IdentNormalizer::new(normalize),
}
Expand Down Expand Up @@ -334,7 +334,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// Timestamp With Time Zone
// INPUT : [SQLDataType] TimestampTz + [RuntimeConfig] Time Zone
// OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
self.schema_provider.options().execution.time_zone.clone()
self.context_provider.options().execution.time_zone.clone()
} else {
// Timestamp Without Time zone
None
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
(
match (
cte,
self.schema_provider.get_table_provider(table_ref.clone()),
self.context_provider.get_table_source(table_ref.clone()),
) {
(Some(cte_plan), _) => Ok(cte_plan.clone()),
(_, Ok(provider)) => {
Expand Down
37 changes: 18 additions & 19 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let DescribeTableStmt { table_name } = statement;
let table_ref = self.object_name_to_table_reference(table_name)?;

let table_source = self.schema_provider.get_table_provider(table_ref)?;
let table_source = self.context_provider.get_table_source(table_ref)?;

let schema = table_source.schema();

Expand All @@ -630,7 +630,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
CopyToSource::Relation(object_name) => {
let table_ref =
self.object_name_to_table_reference(object_name.clone())?;
let table_source = self.schema_provider.get_table_provider(table_ref)?;
let table_source = self.context_provider.get_table_source(table_ref)?;
LogicalPlanBuilder::scan(
object_name_to_string(&object_name),
table_source,
Expand Down Expand Up @@ -912,12 +912,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(table_name.clone())?;
let provider = self.schema_provider.get_table_provider(table_ref.clone())?;
let schema = (*provider.schema()).clone();
let table_source = self.context_provider.get_table_source(table_ref.clone())?;
let schema = (*table_source.schema()).clone();
let schema = DFSchema::try_from(schema)?;
let scan =
LogicalPlanBuilder::scan(object_name_to_string(&table_name), provider, None)?
.build()?;
let scan = LogicalPlanBuilder::scan(
object_name_to_string(&table_name),
table_source,
None,
)?
.build()?;
let mut planner_context = PlannerContext::new();

let source = match predicate_expr {
Expand Down Expand Up @@ -960,10 +963,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
.get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let arrow_schema = (*table_source.schema()).clone();
let table_schema = Arc::new(DFSchema::try_from_qualified_schema(
table_name.clone(),
&arrow_schema,
Expand Down Expand Up @@ -1064,10 +1065,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
let provider = self
.schema_provider
.get_table_provider(table_name.clone())?;
let arrow_schema = (*provider.schema()).clone();
let table_source = self.context_provider.get_table_source(table_name.clone())?;
let arrow_schema = (*table_source.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;

// Get insert fields and index_mapping
Expand Down Expand Up @@ -1193,7 +1192,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
let _ = self.schema_provider.get_table_provider(table_ref)?;
let _ = self.context_provider.get_table_source(table_ref)?;

// treat both FULL and EXTENDED as the same
let select_list = if full || extended {
Expand Down Expand Up @@ -1228,7 +1227,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Do a table lookup to verify the table exists
let table_ref = self.object_name_to_table_reference(sql_table_name)?;
let _ = self.schema_provider.get_table_provider(table_ref)?;
let _ = self.context_provider.get_table_source(table_ref)?;

let query = format!(
"SELECT table_catalog, table_schema, table_name, definition FROM information_schema.views WHERE {where_clause}"
Expand All @@ -1245,8 +1244,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
schema: schema.into(),
table: table.into(),
};
self.schema_provider
.get_table_provider(tables_reference)
self.context_provider
.get_table_source(tables_reference)
.is_ok()
}
}
2 changes: 1 addition & 1 deletion datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2691,7 +2691,7 @@ struct MockContextProvider {
}

impl ContextProvider for MockContextProvider {
fn get_table_provider(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let schema = match name.table() {
"test" => Ok(Schema::new(vec![
Field::new("t_date32", DataType::Date32, false),
Expand Down