-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-12037: [Rust] [DataFusion] Support catalogs and schemas for table namespacing #9762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
36b1703
866e85f
7ab647e
02e83b3
4f000f3
fecf498
1460598
8d30ee4
5e1e942
cf5b846
0f33b6f
58451a0
d79bdd5
66de8d3
a18829f
03048c6
0870acf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,9 +157,9 @@ async fn benchmark(opt: BenchmarkOpt) -> Result<Vec<arrow::record_batch::RecordB | |
| table, | ||
| start.elapsed().as_millis() | ||
| ); | ||
| ctx.register_table(table, Arc::new(memtable)); | ||
| ctx.register_table(*table, Arc::new(memtable))?; | ||
| } else { | ||
| ctx.register_table(table, table_provider); | ||
| ctx.register_table(*table, table_provider)?; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1105,7 +1105,7 @@ fn get_table( | |
| table: &str, | ||
| table_format: &str, | ||
| max_concurrency: usize, | ||
| ) -> Result<Arc<dyn TableProvider + Send + Sync>> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FWIW I was confused about this change initially until I saw that you added |
||
| ) -> Result<Arc<dyn TableProvider>> { | ||
| match table_format { | ||
| // dbgen creates .tbl ('|' delimited) files without header | ||
| "tbl" => { | ||
|
|
@@ -1614,7 +1614,7 @@ mod tests { | |
|
|
||
| let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch]])?; | ||
|
|
||
| ctx.register_table(table, Arc::new(provider)); | ||
| ctx.register_table(table, Arc::new(provider))?; | ||
| } | ||
|
|
||
| let plan = create_logical_plan(&mut ctx, n)?; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! Describes the interface and built-in implementations of catalogs, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| //! representing collections of named schemas. | ||
| use crate::catalog::schema::SchemaProvider; | ||
| use std::any::Any; | ||
| use std::collections::HashMap; | ||
| use std::sync::{Arc, RwLock}; | ||
|
|
||
| /// Represents a catalog, comprising a number of named schemas. | ||
| pub trait CatalogProvider: Sync + Send { | ||
| /// Returns the catalog provider as [`Any`](std::any::Any) | ||
| /// so that it can be downcast to a specific implementation. | ||
| fn as_any(&self) -> &dyn Any; | ||
|
|
||
| /// Retrieves the list of available schema names in this catalog. | ||
| fn schema_names(&self) -> Vec<String>; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about returning something more like Ideally it would be nice if we could do something like As all uses of the results here will need to iterate over the names I suspect
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I iniitally tried to implement these returning
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Vec<&str> might be possible |
||
|
|
||
| /// Retrieves a specific schema from the catalog by name, provided it exists. | ||
| fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>; | ||
| } | ||
|
|
||
| /// Simple in-memory implementation of a catalog. | ||
| pub struct MemoryCatalogProvider { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In some other PR perhaps we can put the concrete implementations into their own modules. I don't think this one is big enough to warrant that yet, however; I just wanted to point it out |
||
| schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>, | ||
| } | ||
|
|
||
| impl MemoryCatalogProvider { | ||
| /// Instantiates a new MemoryCatalogProvider with an empty collection of schemas. | ||
| pub fn new() -> Self { | ||
| Self { | ||
| schemas: RwLock::new(HashMap::new()), | ||
| } | ||
| } | ||
|
|
||
| /// Adds a new schema to this catalog. | ||
| /// If a schema of the same name existed before, it is replaced in the catalog and returned. | ||
| pub fn register_schema( | ||
| &self, | ||
| name: impl Into<String>, | ||
| schema: Arc<dyn SchemaProvider>, | ||
| ) -> Option<Arc<dyn SchemaProvider>> { | ||
| let mut schemas = self.schemas.write().unwrap(); | ||
| schemas.insert(name.into(), schema) | ||
| } | ||
| } | ||
|
|
||
| impl CatalogProvider for MemoryCatalogProvider { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| fn schema_names(&self) -> Vec<String> { | ||
| let schemas = self.schemas.read().unwrap(); | ||
| schemas.keys().cloned().collect() | ||
| } | ||
|
|
||
| fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> { | ||
| let schemas = self.schemas.read().unwrap(); | ||
| schemas.get(name).cloned() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| //! This module contains interfaces and default implementations | ||
| //! of table namespacing concepts, including catalogs and schemas. | ||
| pub mod catalog; | ||
| pub mod schema; | ||
|
|
||
| use crate::error::DataFusionError; | ||
| use std::convert::TryFrom; | ||
|
|
||
| /// Represents a resolved path to a table of the form "catalog.schema.table" | ||
| #[derive(Clone, Copy)] | ||
| pub struct ResolvedTableReference<'a> { | ||
| /// The catalog (aka database) containing the table | ||
| pub catalog: &'a str, | ||
| /// The schema containing the table | ||
| pub schema: &'a str, | ||
| /// The table name | ||
| pub table: &'a str, | ||
| } | ||
|
|
||
| /// Represents a path to a table that may require further resolution | ||
| #[derive(Clone, Copy)] | ||
| pub enum TableReference<'a> { | ||
| /// An unqualified table reference, e.g. "table" | ||
| Bare { | ||
| /// The table name | ||
| table: &'a str, | ||
| }, | ||
| /// A partially resolved table reference, e.g. "schema.table" | ||
| Partial { | ||
| /// The schema containing the table | ||
| schema: &'a str, | ||
| /// The table name | ||
| table: &'a str, | ||
| }, | ||
| /// A fully resolved table reference, e.g. "catalog.schema.table" | ||
| Full { | ||
| /// The catalog (aka database) containing the table | ||
| catalog: &'a str, | ||
| /// The schema containing the table | ||
| schema: &'a str, | ||
| /// The table name | ||
| table: &'a str, | ||
| }, | ||
| } | ||
|
|
||
| impl<'a> TableReference<'a> { | ||
| /// Retrieve the actual table name, regardless of qualification | ||
| pub fn table(&self) -> &str { | ||
| match self { | ||
| Self::Full { table, .. } | ||
| | Self::Partial { table, .. } | ||
| | Self::Bare { table } => table, | ||
| } | ||
| } | ||
|
|
||
| /// Given a default catalog and schema, ensure this table reference is fully resolved | ||
| pub fn resolve( | ||
| self, | ||
| default_catalog: &'a str, | ||
| default_schema: &'a str, | ||
| ) -> ResolvedTableReference<'a> { | ||
| match self { | ||
| Self::Full { | ||
| catalog, | ||
| schema, | ||
| table, | ||
| } => ResolvedTableReference { | ||
| catalog, | ||
| schema, | ||
| table, | ||
| }, | ||
| Self::Partial { schema, table } => ResolvedTableReference { | ||
| catalog: default_catalog, | ||
| schema, | ||
| table, | ||
| }, | ||
| Self::Bare { table } => ResolvedTableReference { | ||
| catalog: default_catalog, | ||
| schema: default_schema, | ||
| table, | ||
| }, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<'a> From<&'a str> for TableReference<'a> { | ||
| fn from(s: &'a str) -> Self { | ||
| Self::Bare { table: s } | ||
| } | ||
| } | ||
|
|
||
| impl<'a> From<ResolvedTableReference<'a>> for TableReference<'a> { | ||
| fn from(resolved: ResolvedTableReference<'a>) -> Self { | ||
| Self::Full { | ||
| catalog: resolved.catalog, | ||
| schema: resolved.schema, | ||
| table: resolved.table, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<'a> TryFrom<&'a sqlparser::ast::ObjectName> for TableReference<'a> { | ||
| type Error = DataFusionError; | ||
|
|
||
| fn try_from(value: &'a sqlparser::ast::ObjectName) -> Result<Self, Self::Error> { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| let idents = &value.0; | ||
|
|
||
| match idents.len() { | ||
| 1 => Ok(Self::Bare { | ||
| table: &idents[0].value, | ||
| }), | ||
| 2 => Ok(Self::Partial { | ||
| schema: &idents[0].value, | ||
| table: &idents[1].value, | ||
| }), | ||
| 3 => Ok(Self::Full { | ||
| catalog: &idents[0].value, | ||
| schema: &idents[1].value, | ||
| table: &idents[2].value, | ||
| }), | ||
| _ => Err(DataFusionError::Plan(format!( | ||
| "invalid table reference: {}", | ||
| value | ||
| ))), | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change to make
register_tablefallible is a breaking change, but a very reasonable one I think.