Skip to content

Commit

Permalink
merge from master
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed May 9, 2022
2 parents 9f7553e + d985c0a commit b82ecd4
Show file tree
Hide file tree
Showing 41 changed files with 1,915 additions and 177 deletions.
46 changes: 40 additions & 6 deletions ballista/rust/scheduler/src/state/persistent_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,21 @@ fn extract_job_id_from_job_key(job_key: &str) -> Result<&str> {

fn extract_stage_id_from_stage_key(stage_key: &str) -> Result<StageKey> {
let splits: Vec<&str> = stage_key.split('/').collect();
if splits.len() < 4 {
if splits.len() > 4 {
Ok((
splits[splits.len() - 2].to_string(),
splits[splits.len() - 1].parse::<u32>().map_err(|e| {
BallistaError::Internal(format!(
"Invalid stage ID in stage key: {}, {:?}",
stage_key, e
))
})?,
))
} else {
Err(BallistaError::Internal(format!(
"Unexpected stage key: {}",
stage_key
)))
} else {
Ok((
splits.get(2).unwrap().to_string(),
splits.get(3).unwrap().parse::<u32>().unwrap(),
))
}
}

Expand All @@ -352,3 +357,32 @@ fn encode_protobuf<T: Message + Default>(msg: &T) -> Result<Vec<u8>> {
})?;
Ok(value)
}

#[cfg(test)]
mod test {
use super::extract_stage_id_from_stage_key;

#[test]
fn test_extract_stage_id_from_stage_key() {
let (job_id, stage_id) =
extract_stage_id_from_stage_key("/ballista/default/stages/2Yoyba8/1")
.expect("extracting stage key");

assert_eq!(job_id.as_str(), "2Yoyba8");
assert_eq!(stage_id, 1);

let (job_id, stage_id) =
extract_stage_id_from_stage_key("ballista/default/stages/2Yoyba8/1")
.expect("extracting stage key");

assert_eq!(job_id.as_str(), "2Yoyba8");
assert_eq!(stage_id, 1);

let (job_id, stage_id) =
extract_stage_id_from_stage_key("ballista//stages/2Yoyba8/1")
.expect("extracting stage key");

assert_eq!(job_id.as_str(), "2Yoyba8");
assert_eq!(stage_id, 1);
}
}
7 changes: 2 additions & 5 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ force_hash_collisions = []
jit = ["datafusion-jit"]
pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"]
regex_expressions = ["datafusion-physical-expr/regex_expressions"]
# Used to enable row format experiment
row = ["datafusion-row"]
# Used to enable scheduler
scheduler = ["rayon"]
simd = ["arrow/simd"]
Expand All @@ -66,7 +64,7 @@ datafusion-data-access = { path = "../../data-access", version = "1.0.0" }
datafusion-expr = { path = "../expr", version = "7.0.0" }
datafusion-jit = { path = "../jit", version = "7.0.0", optional = true }
datafusion-physical-expr = { path = "../physical-expr", version = "7.0.0" }
datafusion-row = { path = "../row", version = "7.0.0", optional = true }
datafusion-row = { path = "../row", version = "7.0.0" }
futures = "0.3"
hashbrown = { version = "0.12", features = ["raw"] }
lazy_static = { version = "^1.4.0" }
Expand Down Expand Up @@ -134,8 +132,7 @@ name = "sql_planner"
[[bench]]
harness = false
name = "jit"
required-features = ["row", "jit"]
required-features = ["jit"]

[[test]]
name = "row"
required-features = ["row"]
10 changes: 10 additions & 0 deletions datafusion/core/benches/aggregate_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ fn criterion_benchmark(c: &mut Criterion) {
)
})
});

c.bench_function("aggregate_query_group_by_u64_multiple_keys", |b| {
b.iter(|| {
query(
ctx.clone(),
"SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) \
FROM t GROUP BY u64_wide, utf8",
)
})
});
}

criterion_group!(benches, criterion_benchmark);
Expand Down
100 changes: 2 additions & 98 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,14 +1603,13 @@ impl FunctionRegistry for TaskContext {
mod tests {
use super::*;
use crate::execution::context::QueryPlanner;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::make_scalar_function;
use crate::test;
use crate::test_util::parquet_test_data;
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
logical_plan::{col, create_udf, sum, Expr},
assert_batches_eq,
logical_plan::{create_udf, Expr},
};
use crate::{logical_plan::create_udaf, physical_plan::expressions::AvgAccumulator};
use arrow::array::ArrayRef;
Expand Down Expand Up @@ -1701,56 +1700,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn unprojected_filter() {
let ctx = SessionContext::new();
let df = ctx
.read_table(test::table_with_sequence(1, 3).unwrap())
.unwrap();

let df = df
.select(vec![binary_expr(col("i"), Operator::Plus, col("i"))])
.unwrap()
.filter(col("i").gt(lit(2)))
.unwrap();
let results = df.collect().await.unwrap();

let expected = vec![
"+--------------------------+",
"| ?table?.i Plus ?table?.i |",
"+--------------------------+",
"| 6 |",
"+--------------------------+",
];
assert_batches_sorted_eq!(expected, &results);
}

#[tokio::test]
async fn aggregate_with_alias() -> Result<()> {
let tmp_dir = TempDir::new()?;
let ctx = create_ctx(&tmp_dir, 1).await?;

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
]));

let plan = LogicalPlanBuilder::scan_empty(None, schema.as_ref(), None)?
.aggregate(vec![col("c1")], vec![sum(col("c2"))])?
.project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;

let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
physical_plan.schema().field(1).name().as_str()
);
Ok(())
}

#[tokio::test]
async fn case_sensitive_identifiers_user_defined_functions() -> Result<()> {
let mut ctx = SessionContext::new();
Expand Down Expand Up @@ -2097,51 +2046,6 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn union_test() -> Result<()> {
let ctx = SessionContext::with_config(
SessionConfig::new().with_information_schema(true),
);

let result = ctx
.sql("SELECT 1 A UNION ALL SELECT 2")
.await
.unwrap()
.collect()
.await
.unwrap();

#[rustfmt::skip]
let expected = vec![
"+---+",
"| a |",
"+---+",
"| 1 |",
"| 2 |",
"+---+"
];
assert_batches_eq!(expected, &result);

let result = ctx
.sql("SELECT 1 UNION SELECT 2")
.await
.unwrap()
.collect()
.await
.unwrap();

let expected = vec![
"+----------+",
"| Int64(1) |",
"+----------+",
"| 1 |",
"| 2 |",
"+----------+",
];
assert_batches_eq!(expected, &result);
Ok(())
}

#[tokio::test]
async fn read_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ pub use datafusion_data_access;
pub use datafusion_expr as logical_expr;
pub use datafusion_physical_expr as physical_expr;

#[cfg(feature = "row")]
pub use datafusion_row as row;

#[cfg(feature = "jit")]
Expand Down
25 changes: 3 additions & 22 deletions datafusion/core/src/physical_plan/aggregates/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use futures::{
};

use crate::error::Result;
use crate::physical_plan::aggregates::{AccumulatorItem, AggregateMode};
use crate::physical_plan::aggregates::{
evaluate, evaluate_many, AccumulatorItem, AggregateMode,
};
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
Expand Down Expand Up @@ -380,27 +382,6 @@ impl std::fmt::Debug for Accumulators {
}
}

/// Evaluates expressions against a record batch.
fn evaluate(
expr: &[Arc<dyn PhysicalExpr>],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
expr.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
}

/// Evaluates expressions against a record batch.
fn evaluate_many(
expr: &[Vec<Arc<dyn PhysicalExpr>>],
batch: &RecordBatch,
) -> Result<Vec<Vec<ArrayRef>>> {
expr.iter()
.map(|expr| evaluate(expr, batch))
.collect::<Result<Vec<_>>>()
}

/// Create a RecordBatch with all group keys and accumulator' states or values.
fn create_batch_from_map(
mode: &AggregateMode,
Expand Down
Loading

0 comments on commit b82ecd4

Please sign in to comment.