Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions rust/datafusion/src/bin/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ pub async fn main() {
.map(|size| size.parse::<usize>().unwrap())
.unwrap_or(1_048_576);

let mut ctx =
ExecutionContext::with_config(ExecutionConfig::new().with_batch_size(batch_size));
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new()
.with_batch_size(batch_size)
.with_information_schema(true),
);

let mut rl = Editor::<()>::new();
rl.load_history(".history").ok();
Expand Down
61 changes: 61 additions & 0 deletions rust/datafusion/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,67 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {
/// Returns the catalog list as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Adds a new catalog to this catalog list
/// If a catalog of the same name existed before, it is replaced in the list and returned.
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;

/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Vec<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better as &[&str] to avoid clone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see in the implementation it is not really possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I could think of to avoid the copy is don't make a CatalogList trait and instead pass around MemoryCatalogList directly. This would allow a function like fn catalog_names(&self) -> impl IntoIterator<Item=&str>

However, given catalog_names are only called when creating the information_schema views, I am not sure this optimization is warranted at this time


/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}

/// Simple in-memory list of catalogs
pub struct MemoryCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
}

impl MemoryCatalogList {
/// Instantiates a new `MemoryCatalogList` with an empty collection of catalogs
pub fn new() -> Self {
Self {
catalogs: RwLock::new(HashMap::new()),
}
}
}

impl CatalogList for MemoryCatalogList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let mut catalogs = self.catalogs.write().unwrap();
catalogs.insert(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
let catalogs = self.catalogs.read().unwrap();
catalogs.keys().map(|s| s.to_string()).collect()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let catalogs = self.catalogs.read().unwrap();
catalogs.get(name).cloned()
}
}

/// Represents a catalog, comprising a number of named schemas.
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`](std::any::Any)
Expand Down
223 changes: 223 additions & 0 deletions rust/datafusion/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Implements the SQL [Information Schema] for DataFusion.
//!
//! Information Schema](https://en.wikipedia.org/wiki/Information_schema)

use std::{any, sync::Arc};

use arrow::{
array::StringBuilder,
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};

use crate::datasource::{MemTable, TableProvider};

use super::{
catalog::{CatalogList, CatalogProvider},
schema::SchemaProvider,
};

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";

/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Arc<dyn CatalogList>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Arc<dyn CatalogList>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
inner,
}
}
}

impl CatalogProvider for CatalogWithInformationSchema {
fn as_any(&self) -> &dyn any::Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner
.schema_names()
.into_iter()
.chain(std::iter::once(INFORMATION_SCHEMA.to_string()))
.collect::<Vec<String>>()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Some(Arc::new(InformationSchemaProvider {
catalog_list: self.catalog_list.clone(),
}))
} else {
self.inner.schema(name)
}
}
}

/// Implements the `information_schema` virtual schema and tables
///
/// The underlying tables in the `information_schema` are created on
/// demand. This means that if more tables are added to the underlying
/// providers, they will appear the next time the `information_schema`
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
}

impl SchemaProvider for InformationSchemaProvider {
fn as_any(&self) -> &(dyn any::Any + 'static) {
self
}

fn table_names(&self) -> Vec<String> {
vec![TABLES.to_string()]
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
if name.eq_ignore_ascii_case("tables") {
// create a mem table with the names of tables
let mut builder = InformationSchemaTablesBuilder::new();

for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
builder.add_base_table(
&catalog_name,
&schema_name,
table_name,
)
}
}
}

// Add a final list for the information schema tables themselves
builder.add_system_table(&catalog_name, INFORMATION_SCHEMA, TABLES);
}

let mem_table = builder.build();

Some(Arc::new(mem_table))
} else {
None
}
}
}

/// Builds the `information_schema.TABLE` table row by row

struct InformationSchemaTablesBuilder {
catalog_names: StringBuilder,
schema_names: StringBuilder,
table_names: StringBuilder,
table_types: StringBuilder,
}

impl InformationSchemaTablesBuilder {
fn new() -> Self {
// StringBuilder requires providing an initial capacity, so
// pick 10 here arbitrarily as this is not performance
// critical code and the number of tables is unavailable here.
let default_capacity = 10;
Self {
catalog_names: StringBuilder::new(default_capacity),
schema_names: StringBuilder::new(default_capacity),
table_names: StringBuilder::new(default_capacity),
table_types: StringBuilder::new(default_capacity),
}
}

fn add_base_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
) {
// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types.append_value("BASE TABLE").unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not really spent much time with the information_schema. Are the own two types BASE TABLE and SYSTEM TABLE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure to be honest. Postgres has BASE TABLE and VIEW that I saw.

I will spend some time figuring out how what the values of these should be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Maybe we can find the standards document somewhere as I assume there is a fixed list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Here is a link to the Postgres doc: https://www.postgresql.org/docs/current/infoschema-tables.html

Type of the table: 

  • BASE TABLE for a persistent base table (the normal table type)
  • VIEW for a view
  • FOREIGN for a foreign table
  • LOCAL TEMPORARY for a temporary table

Are all our 'tables' kind of LOCAL TEMPORARY?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could definitely see an argument that the csv/parquet TableProviders are temporary, but arbitrary TableProviders might use totally different semantics (e.g. in my case, fetching data from a persistent store that could be classed as a base table). If we want to support different options here, perhaps we need a new TableProvider method to indicate that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think LOCAL TEMPORARY is referring to what happens when you do CREATE TEMPORARY TABLE... in postgres -- so I agree with @returnString that LOCAL TEMPORARY is probably not what we would want.

I will change the information table to say VIEW for the information_schema tables, consistent with Postgres (and SQL server)

}

fn add_system_table(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
table_name: impl AsRef<str>,
) {
// Note: append_value is actually infallable.
self.catalog_names
.append_value(catalog_name.as_ref())
.unwrap();
self.schema_names
.append_value(schema_name.as_ref())
.unwrap();
self.table_names.append_value(table_name.as_ref()).unwrap();
self.table_types.append_value("VIEW").unwrap();
}

fn build(self) -> MemTable {
let schema = Schema::new(vec![
Field::new("table_catalog", DataType::Utf8, false),
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
Field::new("table_type", DataType::Utf8, false),
]);

let Self {
mut catalog_names,
mut schema_names,
mut table_names,
mut table_types,
} = self;

let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(catalog_names.finish()),
Arc::new(schema_names.finish()),
Arc::new(table_names.finish()),
Arc::new(table_types.finish()),
],
)
.unwrap();

MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}
1 change: 1 addition & 0 deletions rust/datafusion/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! of table namespacing concepts, including catalogs and schemas.

pub mod catalog;
pub mod information_schema;
pub mod schema;

use crate::error::DataFusionError;
Expand Down
Loading