Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename SessionContext::with_config_rt to SessionContext::new_with_config_from_rt, etc #7631

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/bin/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn group_by(opt: &GroupBy) -> Result<()> {
let mut config = ConfigOptions::from_env()?;
config.execution.batch_size = 65535;

let ctx = SessionContext::with_config(config.into());
let ctx = SessionContext::new_with_config(config.into());

let schema = Schema::new(vec![
Field::new("id1", DataType::Utf8, false),
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl RunOpt {
};

let config = self.common.config();
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
self.register_hits(&ctx).await?;

let iterations = self.common.iterations;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/parquet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl RunOpt {
));
for i in 0..self.common.iterations {
let config = self.common.update_config(scan_options.config());
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);

let (rows, elapsed) = exec_scan(
&ctx,
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl RunOpt {
for i in 0..self.common.iterations {
let config =
SessionConfig::new().with_target_partitions(self.common.partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
let (rows, elapsed) =
exec_sort(&ctx, &expr, &test_file, self.common.debug).await?;
let ms = elapsed.as_secs_f64() * 1000.0;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ConvertOpt {
.file_extension(".tbl");

let config = SessionConfig::new().with_batch_size(self.batch_size);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);

// build plan to read the TBL file
let mut csv = ctx.read_csv(&input_path, options).await?;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl RunOpt {
.common
.config()
.with_collect_statistics(!self.disable_statistics);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);

// register tables
self.register_tables(&ctx).await?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub async fn main() -> Result<()> {
let runtime_env = create_runtime_env(rn_config.clone())?;

let mut ctx =
SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env));
SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
ctx.refresh_catalogs().await?;
// install dynamic catalog provider that knows how to open files
ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl FlightSqlServiceImpl {
let session_config = SessionConfig::from_env()
.map_err(|e| Status::internal(format!("Error building plan: {e}")))?
.with_information_schema(true);
let ctx = Arc::new(SessionContext::with_config(session_config));
let ctx = Arc::new(SessionContext::new_with_config(session_config));

let testdata = datafusion::test_util::parquet_test_data();

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/parquet_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ fn criterion_benchmark(c: &mut Criterion) {

let partitions = 4;
let config = SessionConfig::new().with_target_partitions(partitions);
let context = SessionContext::with_config(config);
let context = SessionContext::new_with_config(config);

let local_rt = tokio::runtime::Builder::new_current_thread()
.build()
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/benches/sort_limit_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ fn create_context() -> Arc<Mutex<SessionContext>> {

rt.block_on(async {
// create local session context
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(1));
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(1),
);

let table_provider = Arc::new(csv.await);
let mem_table = MemTable::load(table_provider, Some(partitions), &ctx.state())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/sql_query_with_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async fn setup_context(object_store: Arc<dyn ObjectStore>) -> SessionContext {
let config = SessionConfig::new().with_target_partitions(THREADS);
let rt = Arc::new(RuntimeEnv::default());
rt.register_object_store(&Url::parse("data://my_store").unwrap(), object_store);
let context = SessionContext::with_config_rt(config, rt);
let context = SessionContext::new_with_config_rt(config, rt);

for table_id in 0..TABLES {
let table_name = table_name(table_id);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/topk_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn create_context(
let mut cfg = SessionConfig::new();
let opts = cfg.options_mut();
opts.optimizer.enable_topk_aggregation = use_topk;
let ctx = SessionContext::with_config(cfg);
let ctx = SessionContext::new_with_config(cfg);
let _ = ctx.register_table("traces", mem_table)?;
let sql = format!("select trace_id, max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};");
let df = ctx.sql(sql.as_str()).await?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1238,7 +1238,7 @@ impl DataFrame {
/// # }
/// ```
pub async fn cache(self) -> Result<DataFrame> {
let context = SessionContext::with_state(self.session_state.clone());
let context = SessionContext::new_with_state(self.session_state.clone());
let mem_table = MemTable::try_new(
SchemaRef::from(self.schema().clone()),
self.collect_partitioned().await?,
Expand Down Expand Up @@ -2011,7 +2011,7 @@ mod tests {
"datafusion.sql_parser.enable_ident_normalization".to_owned(),
"false".to_owned(),
)]))?;
let mut ctx = SessionContext::with_config(config);
let mut ctx = SessionContext::new_with_config(config);
let name = "aggregate_test_100";
register_aggregate_csv(&mut ctx, name).await?;
let df = ctx.table(name);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ mod tests {
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::with_config(config);
let session_ctx = SessionContext::new_with_config(config);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = None;
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ mod tests {
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::with_config(config);
let session_ctx = SessionContext::new_with_config(config);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
// skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work)
Expand Down Expand Up @@ -960,7 +960,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
let testdata = arrow_test_data();
ctx.register_csv(
"aggr",
Expand Down Expand Up @@ -997,7 +997,7 @@ mod tests {
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
let testdata = arrow_test_data();
ctx.register_csv(
"aggr",
Expand Down Expand Up @@ -1033,7 +1033,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
ctx.register_csv(
"empty",
"tests/data/empty_0_byte.csv",
Expand Down Expand Up @@ -1066,7 +1066,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
ctx.register_csv(
"empty",
"tests/data/empty.csv",
Expand Down Expand Up @@ -1104,7 +1104,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
let file_format = CsvFormat::default().with_has_header(false);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::CSV.get_ext());
Expand Down Expand Up @@ -1157,7 +1157,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
let file_format = CsvFormat::default().with_has_header(false);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::CSV.get_ext());
Expand Down Expand Up @@ -1202,7 +1202,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);

ctx.register_csv(
"one_col",
Expand Down Expand Up @@ -1251,7 +1251,7 @@ mod tests {
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let ctx = SessionContext::with_config(config);
let ctx = SessionContext::new_with_config(config);
ctx.register_csv(
"wide_rows",
"tests/data/wide_rows.csv",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ mod tests {
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::with_config(config);
let session_ctx = SessionContext::new_with_config(config);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = None;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ mod tests {
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::with_config(config);
let session_ctx = SessionContext::new_with_config(config);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let projection = None;
Expand Down Expand Up @@ -1406,7 +1406,7 @@ mod tests {
#[tokio::test]
async fn capture_bytes_scanned_metric() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let session = SessionContext::with_config(config);
let session = SessionContext::new_with_config(config);
let ctx = session.state();

// Read the full file
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ mod tests {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
SessionContext::new_with_config(config)
}
None => SessionContext::new(),
};
Expand Down Expand Up @@ -2046,7 +2046,7 @@ mod tests {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
SessionContext::new_with_config(config)
}
None => SessionContext::new(),
};
Expand Down Expand Up @@ -2252,7 +2252,7 @@ mod tests {
let session_ctx = match session_config_map {
Some(cfg) => {
let config = SessionConfig::from_string_hash_map(cfg)?;
SessionContext::with_config(config)
SessionContext::new_with_config(config)
}
None => SessionContext::new(),
};
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,8 +1079,9 @@ mod tests {
async fn write_csv_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(8));
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(8),
);

let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,9 @@ mod tests {
#[tokio::test]
async fn write_json_results() -> Result<()> {
// create partitioned input file and context
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(8));
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(8),
);

let path = format!("{TEST_DATA_BASE}/1.json");

Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1928,8 +1928,9 @@ mod tests {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
// let mut ctx = create_ctx(&tmp_dir, 4).await?;
let ctx =
SessionContext::with_config(SessionConfig::new().with_target_partitions(8));
let ctx = SessionContext::new_with_config(
SessionConfig::new().with_target_partitions(8),
);
let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
// register csv file with the execution context
ctx.register_csv(
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {
#[tokio::test]
async fn issue_3242() -> Result<()> {
// regression test for https://github.com/apache/arrow-datafusion/pull/3242
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down Expand Up @@ -199,7 +199,7 @@ mod tests {

#[tokio::test]
async fn query_view() -> Result<()> {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down Expand Up @@ -237,7 +237,7 @@ mod tests {

#[tokio::test]
async fn query_view_with_alias() -> Result<()> {
let session_ctx = SessionContext::with_config(SessionConfig::new());
let session_ctx = SessionContext::new_with_config(SessionConfig::new());

session_ctx
.sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
Expand Down Expand Up @@ -270,7 +270,7 @@ mod tests {

#[tokio::test]
async fn query_view_with_inline_alias() -> Result<()> {
let session_ctx = SessionContext::with_config(SessionConfig::new());
let session_ctx = SessionContext::new_with_config(SessionConfig::new());

session_ctx
.sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
Expand Down Expand Up @@ -303,7 +303,7 @@ mod tests {

#[tokio::test]
async fn query_view_with_projection() -> Result<()> {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {

#[tokio::test]
async fn query_view_with_filter() -> Result<()> {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down Expand Up @@ -378,7 +378,7 @@ mod tests {

#[tokio::test]
async fn query_join_views() -> Result<()> {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down Expand Up @@ -481,7 +481,7 @@ mod tests {

#[tokio::test]
async fn create_view_plan() -> Result<()> {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down Expand Up @@ -534,7 +534,7 @@ mod tests {

#[tokio::test]
async fn create_or_replace_view() -> Result<()> {
let session_ctx = SessionContext::with_config(
let session_ctx = SessionContext::new_with_config(
SessionConfig::new().with_information_schema(true),
);

Expand Down
Loading