diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index 8da52ed84a5f..1a8f15c8731b 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -82,7 +82,7 @@ impl CliSessionContext for MyUnionerContext { #[tokio::main] /// Runs the example. pub async fn main() { - let mut my_ctx = MyUnionerContext::default(); + let my_ctx = MyUnionerContext::default(); let mut print_options = PrintOptions { format: datafusion_cli::print_format::PrintFormat::Automatic, @@ -91,7 +91,5 @@ pub async fn main() { color: true, }; - exec_from_repl(&mut my_ctx, &mut print_options) - .await - .unwrap(); + exec_from_repl(&my_ctx, &mut print_options).await.unwrap(); } diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index 273eb30d3a71..c4636f1ce0e0 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -240,7 +240,7 @@ mod tests { use datafusion::prelude::SessionContext; fn setup_context() -> (SessionContext, Arc) { - let mut ctx = SessionContext::new(); + let ctx = SessionContext::new(); ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new( ctx.state().catalog_list().clone(), ctx.state_weak_ref(), diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 1a6c023d3b50..05c00d634c94 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -55,7 +55,7 @@ pub enum OutputFormat { impl Command { pub async fn execute( &self, - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, print_options: &mut PrintOptions, ) -> Result<()> { match self { diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index b78f32e0ac48..178bce6f2fe6 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -49,7 +49,7 @@ use tokio::signal; /// run and execute SQL statements and commands, against a context with the given print options pub async fn exec_from_commands( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, commands: Vec, print_options: &PrintOptions, ) -> Result<()> { @@ -62,7 +62,7 @@ pub async fn exec_from_commands( /// run and execute SQL statements and commands from a file, against a context with the given print options pub async fn exec_from_lines( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, reader: &mut BufReader, print_options: &PrintOptions, ) -> Result<()> { @@ -102,7 +102,7 @@ pub async fn exec_from_lines( } pub async fn exec_from_files( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, files: Vec, print_options: &PrintOptions, ) -> Result<()> { @@ -121,7 +121,7 @@ pub async fn exec_from_files( /// run and execute SQL statements and commands against a context with the given print options pub async fn exec_from_repl( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, print_options: &mut PrintOptions, ) -> rustyline::Result<()> { let mut rl = Editor::new()?; @@ -204,7 +204,7 @@ pub async fn exec_from_repl( } pub(super) async fn exec_and_print( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, print_options: &PrintOptions, sql: String, ) -> Result<()> { @@ -300,7 +300,7 @@ fn config_file_type_from_str(ext: &str) -> Option { } async fn create_plan( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, statement: Statement, ) -> Result { let mut plan = ctx.session_state().statement_to_plan(statement).await?; @@ -473,7 +473,7 @@ mod tests { "cos://bucket/path/file.parquet", "gcs://bucket/path/file.parquet", ]; - let mut ctx = SessionContext::new(); + let ctx = SessionContext::new(); let task_ctx = ctx.task_ctx(); let dialect = &task_ctx.session_config().options().sql_parser.dialect; let dialect = dialect_from_str(dialect).ok_or_else(|| { @@ -488,7 +488,7 @@ mod tests { let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { //Should not fail - let mut plan = create_plan(&mut ctx, statement).await?; + let mut plan = create_plan(&ctx, statement).await?; if let LogicalPlan::Copy(copy_to) = &mut plan { assert_eq!(copy_to.output_url, location); assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string()); diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 6266ae6f561a..1810d3cef57c 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -175,7 +175,7 @@ async fn main_inner() -> Result<()> { let runtime_env = create_runtime_env(rt_config.clone())?; - let mut ctx = + let ctx = SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)); ctx.refresh_catalogs().await?; // install dynamic catalog provider that knows how to open files @@ -212,20 +212,20 @@ async fn main_inner() -> Result<()> { if commands.is_empty() && files.is_empty() { if !rc.is_empty() { - exec::exec_from_files(&mut ctx, rc, &print_options).await?; + exec::exec_from_files(&ctx, rc, &print_options).await?; } // TODO maybe we can have thiserror for cli but for now let's keep it simple - return exec::exec_from_repl(&mut ctx, &mut print_options) + return exec::exec_from_repl(&ctx, &mut print_options) .await .map_err(|e| DataFusionError::External(Box::new(e))); } if !files.is_empty() { - exec::exec_from_files(&mut ctx, files, &print_options).await?; + exec::exec_from_files(&ctx, files, &print_options).await?; } if !commands.is_empty() { - exec::exec_from_commands(&mut ctx, commands, &print_options).await?; + exec::exec_from_commands(&ctx, commands, &print_options).await?; } Ok(()) diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index f9ead592c7ea..f770056026ed 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { let dir_a = prepare_example_data()?; let dir_b = prepare_example_data()?; - let mut ctx = SessionContext::new(); + let ctx = SessionContext::new(); let state = ctx.state(); let catlist = Arc::new(CustomCatalogProviderList::new()); diff --git a/datafusion/core/benches/filter_query_sql.rs b/datafusion/core/benches/filter_query_sql.rs index 01adc357b39a..0e09ae09d7c2 100644 --- a/datafusion/core/benches/filter_query_sql.rs +++ b/datafusion/core/benches/filter_query_sql.rs @@ -27,7 +27,7 @@ use futures::executor::block_on; use std::sync::Arc; use tokio::runtime::Runtime; -async fn query(ctx: &mut SessionContext, sql: &str) { +async fn query(ctx: &SessionContext, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query @@ -70,25 +70,25 @@ fn criterion_benchmark(c: &mut Criterion) { let batch_size = 4096; // 2^12 c.bench_function("filter_array", |b| { - let mut ctx = create_context(array_len, batch_size).unwrap(); - b.iter(|| block_on(query(&mut ctx, "select f32, f64 from t where f32 >= f64"))) + let ctx = create_context(array_len, batch_size).unwrap(); + b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= f64"))) }); c.bench_function("filter_scalar", |b| { - let mut ctx = create_context(array_len, batch_size).unwrap(); + let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| { block_on(query( - &mut ctx, + &ctx, "select f32, f64 from t where f32 >= 250 and f64 > 250", )) }) }); c.bench_function("filter_scalar in list", |b| { - let mut ctx = create_context(array_len, batch_size).unwrap(); + let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| { block_on(query( - &mut ctx, + &ctx, "select f32, f64 from t where f32 in (10, 20, 30, 40)", )) }) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c4c5a4aa0834..cc1a63cc05f7 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1550,7 +1550,7 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// # use datafusion_common::ScalarValue; - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; /// let results = ctx /// .sql("SELECT a FROM example WHERE b = $1") @@ -2649,8 +2649,8 @@ mod tests { #[tokio::test] async fn registry() -> Result<()> { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; // declare the udf let my_fn: ScalarFunctionImplementation = @@ -2783,8 +2783,8 @@ mod tests { /// Create a logical plan from a SQL query async fn create_plan(sql: &str) -> Result { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; Ok(ctx.sql(sql).await?.into_unoptimized_plan()) } @@ -3147,9 +3147,9 @@ mod tests { "datafusion.sql_parser.enable_ident_normalization".to_owned(), "false".to_owned(), )]))?; - let mut ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new_with_config(config); let name = "aggregate_test_100"; - register_aggregate_csv(&mut ctx, name).await?; + register_aggregate_csv(&ctx, name).await?; let df = ctx.table(name); let df = df diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1abb550f5c98..2a23f045f3b2 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -189,14 +189,14 @@ mod tests { async fn write_parquet_with_small_rg_size() -> Result<()> { // This test verifies writing a parquet file with small rg size // relative to datafusion.execution.batch_size does not panic - let mut ctx = SessionContext::new_with_config( - SessionConfig::from_string_hash_map(HashMap::from_iter( + let ctx = SessionContext::new_with_config(SessionConfig::from_string_hash_map( + HashMap::from_iter( [("datafusion.execution.batch_size", "10")] .iter() .map(|(s1, s2)| (s1.to_string(), s2.to_string())), - ))?, - ); - register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + ), + )?); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; let test_df = ctx.table("aggregate_test_100").await?; let output_path = "file://local/test.parquet"; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index c883b7a47fbb..c63ffddd81b3 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -179,7 +179,7 @@ where /// # use datafusion::{error::Result, assert_batches_eq}; /// # #[tokio::main] /// # async fn main() -> Result<()> { -/// let mut ctx = SessionContext::new(); +/// let ctx = SessionContext::new(); /// ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; /// let results = ctx /// .sql("SELECT a, min(b) FROM example GROUP BY a LIMIT 100") @@ -369,7 +369,7 @@ impl SessionContext { /// # use datafusion_execution::object_store::ObjectStoreUrl; /// let object_store_url = ObjectStoreUrl::parse("file://").unwrap(); /// let object_store = object_store::local::LocalFileSystem::new(); - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// // All files with the file:// url prefix will be read from the local file system /// ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store)); /// ``` @@ -452,7 +452,7 @@ impl SessionContext { /// # use datafusion::{error::Result, assert_batches_eq}; /// # #[tokio::main] /// # async fn main() -> Result<()> { - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// ctx /// .sql("CREATE TABLE foo (x INTEGER)") /// .await? @@ -480,7 +480,7 @@ impl SessionContext { /// # use datafusion::physical_plan::collect; /// # #[tokio::main] /// # async fn main() -> Result<()> { - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// let options = SQLOptions::new() /// .with_allow_ddl(false); /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options) @@ -1357,7 +1357,7 @@ impl SessionContext { } /// Register [`CatalogProviderList`] in [`SessionState`] - pub fn register_catalog_list(&mut self, catalog_list: Arc) { + pub fn register_catalog_list(&self, catalog_list: Arc) { self.state.write().register_catalog_list(catalog_list) } @@ -1386,15 +1386,18 @@ impl FunctionRegistry for SessionContext { fn udwf(&self, name: &str) -> Result> { self.state.read().udwf(name) } + fn register_udf(&mut self, udf: Arc) -> Result>> { self.state.write().register_udf(udf) } + fn register_udaf( &mut self, udaf: Arc, ) -> Result>> { self.state.write().register_udaf(udaf) } + fn register_udwf(&mut self, udwf: Arc) -> Result>> { self.state.write().register_udwf(udwf) } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 9610a7f20364..937344ef5e4e 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -112,7 +112,7 @@ pub fn aggr_test_schema() -> SchemaRef { /// Register session context for the aggregate_test_100.csv file pub async fn register_aggregate_csv( - ctx: &mut SessionContext, + ctx: &SessionContext, table_name: &str, ) -> Result<()> { let schema = aggr_test_schema(); @@ -128,8 +128,8 @@ pub async fn register_aggregate_csv( /// Create a table from the aggregate_test_100.csv file with the specified name pub async fn test_table_with_name(name: &str) -> Result { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, name).await?; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, name).await?; ctx.table(name).await } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 47804b927e64..1aa33fc75e5d 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -103,7 +103,7 @@ use datafusion_optimizer::AnalyzerRule; /// Execute the specified sql and return the resulting record batches /// pretty printed as a String. -async fn exec_sql(ctx: &mut SessionContext, sql: &str) -> Result { +async fn exec_sql(ctx: &SessionContext, sql: &str) -> Result { let df = ctx.sql(sql).await?; let batches = df.collect().await?; pretty_format_batches(&batches) @@ -112,25 +112,25 @@ async fn exec_sql(ctx: &mut SessionContext, sql: &str) -> Result { } /// Create a test table. -async fn setup_table(mut ctx: SessionContext) -> Result { +async fn setup_table(ctx: SessionContext) -> Result { let sql = "CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) STORED AS CSV location 'tests/data/customer.csv'"; let expected = vec!["++", "++"]; - let s = exec_sql(&mut ctx, sql).await?; + let s = exec_sql(&ctx, sql).await?; let actual = s.lines().collect::>(); assert_eq!(expected, actual, "Creating table"); Ok(ctx) } -async fn setup_table_without_schemas(mut ctx: SessionContext) -> Result { +async fn setup_table_without_schemas(ctx: SessionContext) -> Result { let sql = "CREATE EXTERNAL TABLE sales STORED AS CSV location 'tests/data/customer.csv'"; let expected = vec!["++", "++"]; - let s = exec_sql(&mut ctx, sql).await?; + let s = exec_sql(&ctx, sql).await?; let actual = s.lines().collect::>(); assert_eq!(expected, actual, "Creating table"); @@ -146,7 +146,7 @@ const QUERY2: &str = "SELECT 42, arrow_typeof(42)"; // Run the query using the specified execution context and compare it // to the known result -async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Result<()> { +async fn run_and_compare_query(ctx: SessionContext, description: &str) -> Result<()> { let expected = vec![ "+-------------+---------+", "| customer_id | revenue |", @@ -157,7 +157,7 @@ async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Re "+-------------+---------+", ]; - let s = exec_sql(&mut ctx, QUERY).await?; + let s = exec_sql(&ctx, QUERY).await?; let actual = s.lines().collect::>(); assert_eq!( @@ -174,7 +174,7 @@ async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Re // Run the query using the specified execution context and compare it // to the known result async fn run_and_compare_query_with_analyzer_rule( - mut ctx: SessionContext, + ctx: SessionContext, description: &str, ) -> Result<()> { let expected = vec![ @@ -185,7 +185,7 @@ async fn run_and_compare_query_with_analyzer_rule( "+------------+--------------------------+", ]; - let s = exec_sql(&mut ctx, QUERY2).await?; + let s = exec_sql(&ctx, QUERY2).await?; let actual = s.lines().collect::>(); assert_eq!( @@ -202,7 +202,7 @@ async fn run_and_compare_query_with_analyzer_rule( // Run the query using the specified execution context and compare it // to the known result async fn run_and_compare_query_with_auto_schemas( - mut ctx: SessionContext, + ctx: SessionContext, description: &str, ) -> Result<()> { let expected = vec![ @@ -215,7 +215,7 @@ async fn run_and_compare_query_with_auto_schemas( "+----------+----------+", ]; - let s = exec_sql(&mut ctx, QUERY1).await?; + let s = exec_sql(&ctx, QUERY1).await?; let actual = s.lines().collect::>(); assert_eq!( @@ -262,13 +262,13 @@ async fn topk_query() -> Result<()> { #[tokio::test] // Run EXPLAIN PLAN and show the plan was in fact rewritten async fn topk_plan() -> Result<()> { - let mut ctx = setup_table(make_topk_context()).await?; + let ctx = setup_table(make_topk_context()).await?; let mut expected = ["| logical_plan after topk | TopK: k=3 |", "| | TableScan: sales projection=[customer_id,revenue] |"].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {QUERY}"); - let actual_output = exec_sql(&mut ctx, &explain_query).await?; + let actual_output = exec_sql(&ctx, &explain_query).await?; // normalize newlines (output on windows uses \r\n) let mut actual_output = actual_output.replace("\r\n", "\n"); diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index b96398ef217f..e5c226418441 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -592,7 +592,9 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> { // Set specific Parquet format options let mut key_value_metadata = HashMap::new(); key_value_metadata.insert("test".to_string(), Some("test".to_string())); - parquet_format.key_value_metadata = key_value_metadata.clone(); + parquet_format + .key_value_metadata + .clone_from(&key_value_metadata); parquet_format.global.allow_single_file_parallelism = false; parquet_format.global.created_by = "test".to_string(); diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index a250e880913c..f86cea0bda95 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -146,7 +146,7 @@ For filters that can be pushed down, they'll be passed to the `scan` method as t In order to use the custom table provider, we need to register it with DataFusion. This is done by creating a `TableProvider` and registering it with the `SessionContext`. ```rust -let mut ctx = SessionContext::new(); +let ctx = SessionContext::new(); let custom_table_provider = CustomDataSource::new(); ctx.register_table("custom_table", Arc::new(custom_table_provider));