diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index d1401a6589..4bfc133ef8 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -28,11 +28,14 @@ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionState; +#[cfg(test)] use datafusion::logical_expr::LogicalPlan; use datafusion::logical_expr::{Expr, expr::Cast}; use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; +#[cfg(test)] use datafusion::physical_plan::display::DisplayableExecutionPlan; +#[cfg(test)] use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; use datafusion::{ @@ -60,6 +63,7 @@ use std::{ time::{Instant, SystemTime}, }; use structopt::StructOpt; +#[cfg(test)] use tokio::task::JoinHandle; #[cfg(feature = "snmalloc")] @@ -72,9 +76,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; #[derive(Debug, StructOpt, Clone)] struct BallistaBenchmarkOpt { - /// Query number + /// Query number (1-22). If not specified, runs all queries. #[structopt(short, long)] - query: usize, + query: Option, /// Activate debug mode to see query results #[structopt(short, long)] @@ -122,9 +126,9 @@ struct BallistaBenchmarkOpt { #[derive(Debug, StructOpt, Clone)] struct DataFusionBenchmarkOpt { - /// Query number + /// Query number (1-22). If not specified, runs all queries. #[structopt(short, long)] - query: usize, + query: Option, /// Activate debug mode to see query results #[structopt(short, long)] @@ -283,7 +287,6 @@ async fn main() -> Result<()> { #[allow(clippy::await_holding_lock)] async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result> { println!("Running benchmarks with the following options: {opt:?}"); - let mut benchmark_run = BenchmarkRun::new(opt.query); let config = SessionConfig::new() .with_target_partitions(opt.partitions) .with_batch_size(opt.batch_size); @@ -319,30 +322,65 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result = opt + .query + .map(|q| vec![q]) + .unwrap_or_else(|| (1..=22).collect()); + + let mut benchmark_run = BenchmarkRun::new(); let mut result: Vec = Vec::with_capacity(1); - for i in 0..opt.iterations { - let start = Instant::now(); - let plans = create_logical_plans(&ctx, opt.query).await?; - for plan in plans { - result = execute_query(&ctx, &plan, opt.debug).await?; + + for query in query_numbers { + let mut query_run = QueryRun::new(query); + let mut millis = vec![]; + + // run benchmark + let sqls = get_query_sql(query)?; + if opt.debug { + println!("Query {query}:\n{sqls:?}"); + } + for i in 0..opt.iterations { + let start = Instant::now(); + // Execute each SQL statement sequentially (required for queries like q15 + // that create views and then reference them) + for sql in &sqls { + if opt.debug { + println!("Executing: {sql}"); + } + let df = ctx.sql(sql).await?; + result = df.collect().await?; + } + let elapsed = start.elapsed().as_secs_f64() * 1000.0; + if opt.debug { + pretty::print_batches(&result)?; + } + millis.push(elapsed); + let row_count = result.iter().map(|b| b.num_rows()).sum(); + if opt.iterations == 1 { + println!( + "Query {} took {:.1} ms and returned {} rows", + query, elapsed, row_count + ); + } else { + println!( + "Query {} iteration {} took {:.1} ms and returned {} rows", + query, i, elapsed, row_count + ); + } + query_run.add_result(elapsed, row_count); } - let elapsed = start.elapsed().as_secs_f64() * 1000.0; - millis.push(elapsed); - let row_count = result.iter().map(|b| b.num_rows()).sum(); - println!( - "Query {} iteration {} took {:.1} ms and returned {} rows", - opt.query, i, elapsed, row_count - ); - benchmark_run.add_result(elapsed, row_count); - } - let avg = millis.iter().sum::() / millis.len() as f64; - println!("Query {} avg time: {:.2} ms", opt.query, avg); + if opt.iterations > 1 { + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {} avg time: {:.1} ms", query, avg); + } + + benchmark_run.add_query_run(query_run); + } if let Some(path) = &opt.output_path { - write_summary_json(&mut benchmark_run, path)?; + write_summary_json(&benchmark_run, path)?; } Ok(result) @@ -350,93 +388,112 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result<()> { println!("Running benchmarks with the following options: {opt:?}"); - let mut benchmark_run = BenchmarkRun::new(opt.query); - let config = SessionConfig::new_with_ballista() - .with_target_partitions(opt.partitions) - .with_ballista_job_name(&format!("Query derived from TPC-H q{}", opt.query)) - .with_batch_size(opt.batch_size) - .with_collect_statistics(true); - - let state = SessionStateBuilder::new() - .with_default_features() - .with_config(config) - .build(); let address = format!( "df://{}:{}", opt.host.clone().unwrap().as_str(), opt.port.unwrap() ); - let ctx = SessionContext::remote_with_state(&address, state).await?; - // register tables with Ballista context - let path = opt.path.to_str().unwrap(); - let file_format = opt.file_format.as_str(); + // Determine which queries to run + let query_numbers: Vec = opt + .query + .map(|q| vec![q]) + .unwrap_or_else(|| (1..=22).collect()); - register_tables(path, file_format, &ctx, opt.debug).await?; + let mut benchmark_run = BenchmarkRun::new(); - let mut millis = vec![]; + for query in query_numbers { + let mut query_run = QueryRun::new(query); - // run benchmark - let queries = get_query_sql(opt.query)?; - println!( - "Running benchmark with queries {}:\n {:?}", - opt.query, queries - ); - let mut batches = vec![]; - for i in 0..opt.iterations { - let start = Instant::now(); - for sql in &queries { - let df = ctx - .sql(sql) - .await - .map_err(|e| DataFusionError::Plan(format!("{e:?}"))) - .unwrap(); - let plan = df.clone().into_optimized_plan()?; + let config = SessionConfig::new_with_ballista() + .with_target_partitions(opt.partitions) + .with_ballista_job_name(&format!("Query derived from TPC-H q{}", query)) + .with_batch_size(opt.batch_size) + .with_collect_statistics(true); + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .build(); + let ctx = SessionContext::remote_with_state(&address, state).await?; + + // register tables with Ballista context + let path = opt.path.to_str().unwrap(); + let file_format = opt.file_format.as_str(); + + register_tables(path, file_format, &ctx, opt.debug).await?; + + let mut millis = vec![]; + + // run benchmark + let sqls = get_query_sql(query)?; + if opt.debug { + println!("Running benchmark with query {}:\n {:?}", query, sqls); + } + let mut batches = vec![]; + for i in 0..opt.iterations { + let start = Instant::now(); + for sql in &sqls { + let df = ctx + .sql(sql) + .await + .map_err(|e| DataFusionError::Plan(format!("{e:?}"))) + .unwrap(); + let plan = df.clone().into_optimized_plan()?; + if opt.debug { + println!("=== Optimized logical plan ===\n{plan:?}\n"); + } + batches = df + .collect() + .await + .map_err(|e| DataFusionError::Plan(format!("{e:?}"))) + .unwrap(); + } + let elapsed = start.elapsed().as_secs_f64() * 1000.0; + millis.push(elapsed); + let row_count = batches.iter().map(|b| b.num_rows()).sum(); + if opt.iterations == 1 { + println!( + "Query {} took {:.1} ms and returned {} rows", + query, elapsed, row_count + ); + } else { + println!( + "Query {} iteration {} took {:.1} ms and returned {} rows", + query, i, elapsed, row_count + ); + } + query_run.add_result(elapsed, row_count); if opt.debug { - println!("=== Optimized logical plan ===\n{plan:?}\n"); + pretty::print_batches(&batches)?; + } + + if let Some(expected_results_path) = opt.expected_results.as_ref() { + let expected = get_expected_results(query, expected_results_path).await?; + assert_expected_results(&expected, &batches) } - batches = df - .collect() - .await - .map_err(|e| DataFusionError::Plan(format!("{e:?}"))) - .unwrap(); - } - let elapsed = start.elapsed().as_secs_f64() * 1000.0; - millis.push(elapsed); - let row_count = batches.iter().map(|b| b.num_rows()).sum(); - println!( - "Query {} iteration {} took {:.1} ms and returned {} rows", - opt.query, i, elapsed, row_count - ); - benchmark_run.add_result(elapsed, row_count); - if opt.debug { - pretty::print_batches(&batches)?; } - if let Some(expected_results_path) = opt.expected_results.as_ref() { - let expected = get_expected_results(opt.query, expected_results_path).await?; - assert_expected_results(&expected, &batches) + if opt.iterations > 1 { + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Query {} avg time: {:.1} ms", query, avg); } - } - let avg = millis.iter().sum::() / millis.len() as f64; - println!("Query {} avg time: {:.2} ms", opt.query, avg); + benchmark_run.add_query_run(query_run); + } if let Some(path) = &opt.output_path { - write_summary_json(&mut benchmark_run, path)?; + write_summary_json(&benchmark_run, path)?; } Ok(()) } -fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> { +fn write_summary_json(benchmark_run: &BenchmarkRun, path: &Path) -> Result<()> { let json = serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable"); - let filename = format!( - "tpch-q{}-{}.json", - benchmark_run.query, benchmark_run.start_time - ); + let filename = format!("tpch-{}.json", benchmark_run.start_time); let path = path.join(filename); println!( "Writing summary file to {}", @@ -662,6 +719,7 @@ fn get_query_sql(query: usize) -> Result> { } /// Create a logical plan for each query in the specified query file +#[cfg(test)] async fn create_logical_plans( ctx: &SessionContext, query: usize, @@ -687,6 +745,7 @@ async fn create_logical_plans( .collect() } +#[cfg(test)] async fn execute_query( ctx: &SessionContext, plan: &LogicalPlan, @@ -833,7 +892,7 @@ async fn get_table( ) } "parquet" => { - let path = format!("{path}/{table}"); + let path = find_path(path, table, "parquet")?; let format = ParquetFormat::default().with_enable_pruning(true); ( @@ -971,6 +1030,27 @@ pub fn get_tbl_tpch_table_schema(table: &str) -> Schema { schema.finish() } +#[derive(Debug, Serialize)] +struct QueryRun { + /// query number + query: usize, + /// list of individual run times and row counts + iterations: Vec, +} + +impl QueryRun { + fn new(query: usize) -> Self { + Self { + query, + iterations: vec![], + } + } + + fn add_result(&mut self, elapsed: f64, row_count: usize) { + self.iterations.push(QueryResult { elapsed, row_count }) + } +} + #[derive(Debug, Serialize)] struct BenchmarkRun { /// Benchmark crate version @@ -983,14 +1063,12 @@ struct BenchmarkRun { start_time: u64, /// CLI arguments arguments: Vec, - /// query number - query: usize, - /// list of individual run times and row counts - iterations: Vec, + /// Results for each query + queries: Vec, } impl BenchmarkRun { - fn new(query: usize) -> Self { + fn new() -> Self { Self { benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), datafusion_version: DATAFUSION_VERSION.to_owned(), @@ -1000,13 +1078,12 @@ impl BenchmarkRun { .expect("current time is later than UNIX_EPOCH") .as_secs(), arguments: std::env::args().skip(1).collect::>(), - query, - iterations: vec![], + queries: vec![], } } - fn add_result(&mut self, elapsed: f64, row_count: usize) { - self.iterations.push(QueryResult { elapsed, row_count }) + fn add_query_run(&mut self, query_run: QueryRun) { + self.queries.push(query_run) } } @@ -1637,7 +1714,7 @@ mod tests { // run the query to compute actual results of the query let opt = DataFusionBenchmarkOpt { - query: n, + query: Some(n), debug: false, iterations: 1, partitions: 2,