Skip to content

Commit

Permalink
Extract catalog API to separate crate
Browse files Browse the repository at this point in the history
This moves `CatalogProvider`, `TableProvider`, `SchemaProvider` to a new
`datafusion-catalog` crate.  The circular dependency between core
`SessionState` and implementations is broken up by introducing
`CatalogSession` dyn trait.  Implementations of `TableProvider` that
reside under core current have access to `CatalogSession` by
downcasting. This is supposed to be an intermediate step.
  • Loading branch information
findepi committed Jul 17, 2024
1 parent de0765a commit 5bbf5d7
Show file tree
Hide file tree
Showing 42 changed files with 808 additions and 506 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ exclude = ["datafusion-cli", "dev/depcheck"]
members = [
"datafusion/common",
"datafusion/common-runtime",
"datafusion/catalog",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
Expand Down Expand Up @@ -87,6 +88,7 @@ chrono = { version = "0.4.34", default-features = false }
ctor = "0.2.0"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false }
datafusion-catalog = { path = "datafusion/catalog", version = "40.0.0" }
datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" }
datafusion-execution = { path = "datafusion/execution", version = "40.0.0" }
Expand Down
13 changes: 13 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use async_trait::async_trait;

use datafusion::catalog_api::CatalogSession;
use datafusion::common::{plan_err, Column};
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
Expand Down Expand Up @@ -234,7 +235,7 @@ impl TableProvider for ParquetMetadataTable {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::catalog_api::CatalogSession;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::{
ParquetAccessPlan, ParquetExecBuilder,
Expand All @@ -27,7 +28,6 @@ use datafusion::datasource::physical_plan::{
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
Expand Down Expand Up @@ -271,7 +271,7 @@ impl IndexTableProvider {
/// to a single predicate like `a = 1 AND b = 2` suitable for execution
fn filters_to_predicate(
&self,
state: &SessionState,
state: &dyn CatalogSession,
filters: &[Expr],
) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = DFSchema::try_from(self.schema())?;
Expand Down Expand Up @@ -463,7 +463,7 @@ impl TableProvider for IndexTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
6 changes: 2 additions & 4 deletions datafusion-examples/examples/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
use async_trait::async_trait;
use datafusion::{
arrow::util::pretty,
catalog::{
schema::SchemaProvider,
{CatalogProvider, CatalogProviderList},
},
catalog::CatalogProviderList,
catalog_api::{CatalogProvider, SchemaProvider},
datasource::{
file_format::{csv::CsvFormat, FileFormat},
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Expand All @@ -37,6 +37,7 @@ use datafusion_expr::LogicalPlanBuilder;
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use datafusion::catalog_api::CatalogSession;
use tokio::time::timeout;

/// This example demonstrates executing a simple query against a custom datasource
Expand Down Expand Up @@ -175,7 +176,7 @@ impl TableProvider for CustomDataSource {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[Expr],
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use arrow::datatypes::Int32Type;
use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::catalog_api::CatalogSession;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
parquet::StatisticsConverter,
{FileScanConfig, ParquetExec},
};
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::{
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
Expand Down Expand Up @@ -222,7 +222,7 @@ impl TableProvider for IndexTableProvider {

async fn scan(
&self,
state: &SessionState,
state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ use arrow::csv::ReaderBuilder;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog_api::CatalogSession;
use datafusion::datasource::function::TableFunctionImpl;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::{ExecutionProps, SessionState};
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
Expand All @@ -35,7 +36,6 @@ use std::fs::File;
use std::io::Seek;
use std::path::Path;
use std::sync::Arc;

// To define your own table function, you only need to do the following 3 things:
// 1. Implement your own [`TableProvider`]
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
Expand Down Expand Up @@ -95,7 +95,7 @@ impl TableProvider for LocalCsvTable {

async fn scan(
&self,
_state: &SessionState,
_state: &dyn CatalogSession,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand Down
38 changes: 38 additions & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "datafusion-catalog"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license.workspace = true
readme.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true

[dependencies]
arrow-schema = { workspace = true }
async-trait = "0.1.41"
datafusion-expr = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-physical-plan = { workspace = true }

[lints]
workspace = true
153 changes: 153 additions & 0 deletions datafusion/catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::sync::Arc;

pub use crate::schema::SchemaProvider;
use datafusion_common::not_impl_err;
use datafusion_common::Result;

/// Represents a catalog, comprising a number of named schemas.
///
/// # Catalog Overview
///
/// To plan and execute queries, DataFusion needs a "Catalog" that provides
/// metadata such as which schemas and tables exist, their columns and data
/// types, and how to access the data.
///
/// The Catalog API consists:
/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s
/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems)
/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems)
/// * [`TableProvider]`: individual tables
///
/// # Implementing Catalogs
///
/// To implement a catalog, you implement at least one of the [`CatalogProviderList`],
/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them
/// appropriately the [`SessionContext`].
///
/// [`SessionContext`]: crate::execution::context::SessionContext
///
/// DataFusion comes with a simple in-memory catalog implementation,
/// [`MemoryCatalogProvider`], that is used by default and has no persistence.
/// DataFusion does not include more complex Catalog implementations because
/// catalog management is a key design choice for most data systems, and thus
/// it is unlikely that any general-purpose catalog implementation will work
/// well across many use cases.
///
/// # Implementing "Remote" catalogs
///
/// Sometimes catalog information is stored remotely and requires a network call
/// to retrieve. For example, the [Delta Lake] table format stores table
/// metadata in files on S3 that must be first downloaded to discover what
/// schemas and tables exist.
///
/// [Delta Lake]: https://delta.io/
///
/// The [`CatalogProvider`] can support this use case, but it takes some care.
/// The planning APIs in DataFusion are not `async` and thus network IO can not
/// be performed "lazily" / "on demand" during query planning. The rationale for
/// this design is that using remote procedure calls for all catalog accesses
/// required for query planning would likely result in multiple network calls
/// per plan, resulting in very poor planning performance.
///
/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs,
/// you need to provide an in memory snapshot of the required metadata. Most
/// systems typically either already have this information cached locally or can
/// batch access to the remote catalog to retrieve multiple schemas and tables
/// in a single network call.
///
/// Note that [`SchemaProvider::table`] is an `async` function in order to
/// simplify implementing simple [`SchemaProvider`]s. For many table formats it
/// is easy to list all available tables but there is additional non trivial
/// access required to read table details (e.g. statistics).
///
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
/// the query to [find all table references],
/// performing required remote catalog in parallel, and then plans the query
/// using that snapshot.
///
/// [find all table references]: resolve_table_references
///
/// # Example Catalog Implementations
///
/// Here are some examples of how to implement custom catalogs:
///
/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider
/// that treats files and directories on a filesystem as tables.
///
/// * The [`catalog.rs`]: a simple directory based catalog.
///
/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
/// read from Delta Lake tables
///
/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html
/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs
/// [delta-rs]: https://github.com/delta-io/delta-rs
/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123
///
/// [`TableProvider]: crate::datasource::TableProvider

pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`]
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;

/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;

/// Adds a new schema to this catalog.
///
/// If a schema of the same name existed before, it is replaced in
/// the catalog and returned.
///
/// By default returns a "Not Implemented" error
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
// use variables to avoid unused variable warnings
let _ = name;
let _ = schema;
not_impl_err!("Registering new schemas is not supported")
}

/// Removes a schema from this catalog. Implementations of this method should return
/// errors if the schema exists but cannot be dropped. For example, in DataFusion's
/// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
/// will only be successfully dropped when `cascade` is true.
/// This is equivalent to how DROP SCHEMA works in PostgreSQL.
///
/// Implementations of this method should return None if schema with `name`
/// does not exist.
///
/// By default returns a "Not Implemented" error
fn deregister_schema(
&self,
_name: &str,
_cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
not_impl_err!("Deregistering new schemas is not supported")
}
}
Loading

0 comments on commit 5bbf5d7

Please sign in to comment.