Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into csv-newlines-in-values
Browse files Browse the repository at this point in the history
  • Loading branch information
connec committed Jul 20, 2024
2 parents b9cc96b + 5da7ab3 commit 4d06432
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 182 deletions.
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,8 +1038,8 @@ mod tests {
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::json::JsonFormat;
use crate::datasource::file_format::parquet::ParquetFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{provider_as_source, MemTable};
use crate::execution::options::ArrowReadOptions;
use crate::physical_plan::collect;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ mod tests {
use crate::datasource::schema_adapter::{
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
};
#[cfg(feature = "parquet")]
use parquet::arrow::ArrowWriter;
use tempfile::TempDir;

Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

pub mod context;
pub mod session_state;
mod session_state_defaults;

pub use session_state_defaults::SessionStateDefaults;

// backwards compatibility
pub use crate::datasource::file_format::options;
Expand Down
185 changes: 4 additions & 181 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,17 @@
//! [`SessionState`]: information required to run queries in a session

use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
use crate::catalog::{
CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
MemoryCatalogProviderList,
};
use crate::catalog::schema::SchemaProvider;
use crate::catalog::{CatalogProviderList, MemoryCatalogProviderList};
use crate::datasource::cte_worktable::CteWorkTable;
use crate::datasource::file_format::arrow::ArrowFormatFactory;
use crate::datasource::file_format::avro::AvroFormatFactory;
use crate::datasource::file_format::csv::CsvFormatFactory;
use crate::datasource::file_format::json::JsonFormatFactory;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormatFactory;
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
use crate::datasource::function::{TableFunction, TableFunctionImpl};
use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory};
use crate::datasource::provider::TableProviderFactory;
use crate::datasource::provider_as_source;
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
#[cfg(feature = "array_expressions")]
use crate::functions_array;
use crate::execution::SessionStateDefaults;
use crate::physical_optimizer::optimizer::PhysicalOptimizer;
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use crate::{functions, functions_aggregate};
use arrow_schema::{DataType, SchemaRef};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand All @@ -54,7 +42,6 @@ use datafusion_common::{
ResolvedTableReference, TableReference,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -85,7 +72,6 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Arc;
use url::Url;
use uuid::Uuid;

/// Execution context for registering data sources and executing queries.
Expand Down Expand Up @@ -1420,169 +1406,6 @@ impl From<SessionState> for SessionStateBuilder {
}
}

/// Defaults that are used as part of creating a SessionState such as table providers,
/// file formats, registering of builtin functions, etc.
pub struct SessionStateDefaults {}

impl SessionStateDefaults {
/// returns a map of the default [`TableProviderFactory`]s
pub fn default_table_factories() -> HashMap<String, Arc<dyn TableProviderFactory>> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
#[cfg(feature = "parquet")]
table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));

table_factories
}

/// returns the default MemoryCatalogProvider
pub fn default_catalog(
config: &SessionConfig,
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
runtime: &Arc<RuntimeEnv>,
) -> MemoryCatalogProvider {
let default_catalog = MemoryCatalogProvider::new();

default_catalog
.register_schema(
&config.options().catalog.default_schema,
Arc::new(MemorySchemaProvider::new()),
)
.expect("memory catalog provider can register schema");

Self::register_default_schema(config, table_factories, runtime, &default_catalog);

default_catalog
}

/// returns the list of default [`ExprPlanner`]s
pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::ArrayFunctionPlanner),
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::FieldAccessPlanner),
#[cfg(any(
feature = "datetime_expressions",
feature = "unicode_expressions"
))]
Arc::new(functions::planner::UserDefinedFunctionPlanner),
];

expr_planners
}

/// returns the list of default [`ScalarUDF']'s
pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();
#[cfg(feature = "array_expressions")]
functions.append(&mut functions_array::all_default_array_functions());

functions
}

/// returns the list of default [`AggregateUDF']'s
pub fn default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
functions_aggregate::all_default_aggregate_functions()
}

/// returns the list of default [`FileFormatFactory']'s
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
#[cfg(feature = "parquet")]
Arc::new(ParquetFormatFactory::new()),
Arc::new(JsonFormatFactory::new()),
Arc::new(CsvFormatFactory::new()),
Arc::new(ArrowFormatFactory::new()),
Arc::new(AvroFormatFactory::new()),
];

file_formats
}

/// registers all builtin functions - scalar, array and aggregate
pub fn register_builtin_functions(state: &mut SessionState) {
Self::register_scalar_functions(state);
Self::register_array_functions(state);
Self::register_aggregate_functions(state);
}

/// registers all the builtin scalar functions
pub fn register_scalar_functions(state: &mut SessionState) {
functions::register_all(state).expect("can not register built in functions");
}

/// registers all the builtin array functions
pub fn register_array_functions(state: &mut SessionState) {
// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
functions_array::register_all(state).expect("can not register array expressions");
}

/// registers all the builtin aggregate functions
pub fn register_aggregate_functions(state: &mut SessionState) {
functions_aggregate::register_all(state)
.expect("can not register aggregate functions");
}

/// registers the default schema
pub fn register_default_schema(
config: &SessionConfig,
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
runtime: &Arc<RuntimeEnv>,
default_catalog: &MemoryCatalogProvider,
) {
let url = config.options().catalog.location.as_ref();
let format = config.options().catalog.format.as_ref();
let (url, format) = match (url, format) {
(Some(url), Some(format)) => (url, format),
_ => return,
};
let url = url.to_string();
let format = format.to_string();

let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
let authority = match url.host_str() {
Some(host) => format!("{}://{}", url.scheme(), host),
None => format!("{}://", url.scheme()),
};
let path = &url.as_str()[authority.len()..];
let path = object_store::path::Path::parse(path).expect("Can't parse path");
let store = ObjectStoreUrl::parse(authority.as_str())
.expect("Invalid default catalog url");
let store = match runtime.object_store(store) {
Ok(store) => store,
_ => return,
};
let factory = match table_factories.get(format.as_str()) {
Some(factory) => factory,
_ => return,
};
let schema =
ListingSchemaProvider::new(authority, path, factory.clone(), store, format);
let _ = default_catalog
.register_schema("default", Arc::new(schema))
.expect("Failed to register default schema");
}

/// registers the default [`FileFormatFactory`]s
pub fn register_default_file_formats(state: &mut SessionState) {
let formats = SessionStateDefaults::default_file_formats();
for format in formats {
if let Err(e) = state.register_file_format(format, false) {
log::info!("Unable to register default file format: {e}")
};
}
}
}

/// Adapter that implements the [`ContextProvider`] trait for a [`SessionState`]
///
/// This is used so the SQL planner can access the state of the session without
Expand Down
Loading

0 comments on commit 4d06432

Please sign in to comment.