Skip to content

Commit

Permalink
Create table prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
svilen-mihaylov-db committed Aug 19, 2024
1 parent bd48262 commit 27a5397
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 7 deletions.
242 changes: 240 additions & 2 deletions datafusion/core/tests/custom_sources_cases/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use datafusion::{
prelude::SessionContext,
scalar::ScalarValue,
};
use datafusion_common::{project_schema, stats::Precision};
use datafusion_common::{project_schema, stats::Precision, DataFusionError};
use datafusion_physical_expr::EquivalenceProperties;

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion::catalog_common::{MemoryCatalogProviderList, MemorySchemaProvider};
use datafusion_catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};

/// This is a testing structure for statistics
/// It will act both as a table provider and execution plan
Expand Down Expand Up @@ -299,3 +300,240 @@ async fn sql_window() -> Result<()> {

Ok(())
}

struct WrappingTableProvider {
underlying: Arc<dyn TableProvider>,
}

impl WrappingTableProvider {
fn new(underlying: Arc<dyn TableProvider>) -> Self {
println!("Creating new custom table provider.");
Self {
underlying: underlying,
}
}

fn deregister(&self) {
println!("Deregistering custom table provider.");
}
}

#[async_trait]
impl TableProvider for WrappingTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.underlying.schema()
}

fn table_type(&self) -> TableType {
self.underlying.table_type()
}

async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
self.underlying
.scan(state, projection, filters, limit)
.await
}
}

struct TestSchemaProvider {
underlying: MemorySchemaProvider,
}

impl TestSchemaProvider {
pub fn new() -> Self {
Self {
underlying: MemorySchemaProvider::new(),
}
}
}

impl Default for TestSchemaProvider {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl SchemaProvider for TestSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.underlying.table_names()
}

async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
self.underlying.table(name).await
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
println!("Registering table {}", name);

let schema = table.schema();
for field in &schema.fields {
println!("Field {} options", field.name());
for (k, v) in field.metadata() {
println!("{}={}", k, v)
}
}

println!("Metadata");
for (k, v) in &schema.metadata {
println!("{}={}", k, v)
}

// Here we can register our own TableProvider instance instead of a memory table 'table'.
self.underlying
.register_table(name.clone(), Arc::new(WrappingTableProvider::new(table)))
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
println!("Deregistering table {}", name);
let result = self.underlying.deregister_table(name);
if let Ok(result) = result {
if let Some(result) = result {
let provider: &WrappingTableProvider =
match result.as_any().downcast_ref::<WrappingTableProvider>() {
Some(p) => p,
None => panic!("Unexpected table provider"),
};
provider.deregister();
return Ok(Some(result));
}
return Ok(result);
}
result
}

fn table_exist(&self, name: &str) -> bool {
self.underlying.table_exist(name)
}
}

struct TestCatalogProvider {
provider: Arc<TestSchemaProvider>,
}

impl TestCatalogProvider {
pub fn new() -> Self {
Self {
provider: Arc::new(TestSchemaProvider::new()),
}
}
}

impl Default for TestCatalogProvider {
fn default() -> Self {
Self::new()
}
}

impl CatalogProvider for TestCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
vec!["public".to_owned()]
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
assert_eq!("public", name);
Some(self.provider.clone())
}
}

struct TestCatalogProviderList {
underlying: MemoryCatalogProviderList,
}

impl TestCatalogProviderList {
pub fn new() -> Self {
Self {
underlying: MemoryCatalogProviderList::new(),
}
}
}

impl Default for TestCatalogProviderList {
fn default() -> Self {
Self::new()
}
}

impl CatalogProviderList for TestCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.underlying.register_catalog(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
self.underlying.catalog_names()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.underlying.catalog(name)
}
}

#[tokio::test]
async fn sql_basic1() -> Result<()> {
let (stats, schema) = fully_defined();
let ctx = init_ctx(stats.clone(), schema)?;

let catalog_list = Arc::new(TestCatalogProviderList::new());
catalog_list.register_catalog(
"datafusion".to_string(),
Arc::new(TestCatalogProvider::new()),
);
ctx.register_catalog_list(catalog_list);

let _df = ctx
.sql(
"CREATE TABLE t (
a integer options (dimension=1, lower_bound='0'),
b integer)
ENGINE=mytile
OPTIONS (
v=2,
v1=4,
engine_option='etc etc'
)",
)
.await
.unwrap();

let _df1 = ctx.sql("DROP TABLE t").await.unwrap();

// let physical_plan = df.create_physical_plan().await.unwrap();

// the statistics should be those of the source
// assert_eq!(stats, physical_plan.statistics()?);

Ok(())
}
24 changes: 22 additions & 2 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,31 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.options
.iter()
.any(|x| x.option == ColumnOption::NotNull);
fields.push(Field::new(

let mut metadata: HashMap<String, String> = HashMap::new();
column.options.iter().for_each(|v| {
if v.name.clone().map_or(true, |ident| ident.value.is_empty()) {
// Only consider empty option names here.
if let ColumnOption::Options(options) = v.option.clone() {
// Only consider options (as opposed to foreign keys, etc.)
for option in options {
if let sqlparser::ast::Expr::Value(value) = option.value {
// Only consider values (as opposed to expressions etc.)
metadata.insert(option.name.value, value.to_string());
}
}
}
}
});

let field = Field::new(
self.ident_normalizer.normalize(column.name),
data_type,
!not_nullable,
));
)
.with_metadata(metadata);

fields.push(field);
}

Ok(Schema::new(fields))
Expand Down
20 changes: 17 additions & 3 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ use sqlparser::ast::{
Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert,
ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr,
ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor,
TableWithJoins, TransactionMode, UnaryOperator, Value,
ShowCreateObject, ShowStatementFilter, SqlOption, Statement, TableConstraint,
TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
};
use sqlparser::parser::ParserError::ParserError;

Expand Down Expand Up @@ -189,6 +189,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
self.sql_statement_to_plan_with_context_impl(statement, planner_context)
}

fn options_to_schema_metadata(
options: &Option<Vec<SqlOption>>,
) -> HashMap<String, String> {
let mut metadata: HashMap<String, String> = HashMap::new();
for option in options.as_ref().unwrap_or(&vec![]) {
metadata.insert(option.name.to_string(), option.value.to_string());
}
metadata
}

fn sql_statement_to_plan_with_context_impl(
&self,
statement: Statement,
Expand Down Expand Up @@ -229,6 +239,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
with_options,
if_not_exists,
or_replace,
options,
..
}) if table_properties.is_empty() && with_options.is_empty() => {
// Merge inline constraints and existing constraints
Expand Down Expand Up @@ -290,7 +301,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

None => {
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
let schema = self
.build_schema(columns)?
.with_metadata(Self::options_to_schema_metadata(&options))
.to_dfschema_ref()?;
let plan = EmptyRelation {
produce_one_row: false,
schema,
Expand Down

0 comments on commit 27a5397

Please sign in to comment.