Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions rust/benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use arrow::datatypes::{DataType, DateUnit, Field, Schema};
Expand All @@ -28,7 +27,6 @@ use datafusion::datasource::{CsvFile, MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::prelude::*;

use parquet::basic::Compression;
Expand Down Expand Up @@ -87,6 +85,10 @@ struct ConvertOpt {
/// Compression to use when writing Parquet files
#[structopt(short = "c", long = "compression", default_value = "snappy")]
compression: String,

/// Number of partitions to produce
#[structopt(short = "p", long = "partitions", default_value = "1")]
partitions: usize,
}

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -1017,8 +1019,21 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
.delimiter(b'|')
.file_extension(".tbl");

let ctx = ExecutionContext::new();
let csv = Arc::new(CsvExec::try_new(&input_path, options, None, 4096)?);
let mut ctx = ExecutionContext::new();

// build plan to read the TBL file
let mut csv = ctx.read_csv(&input_path, options)?;

// optionally, repartition the file
if opt.partitions > 1 {
csv = csv.repartition(Partitioning::RoundRobinBatch(opt.partitions))?
}

// create the physical plan
let csv = csv.to_logical_plan();
let csv = ctx.optimize(&csv)?;
let csv = ctx.create_physical_plan(&csv)?;

let output_path = output_root_path.join(table);
let output_path = output_path.to_str().unwrap().to_owned();

Expand Down
21 changes: 20 additions & 1 deletion rust/datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan};
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
};
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -172,6 +174,23 @@ pub trait DataFrame {
right_cols: &[&str],
) -> Result<Arc<dyn DataFrame>>;

/// Repartition a DataFrame based on a logical partitioning scheme.
///
/// ```
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # fn main() -> Result<()> {
/// let mut ctx = ExecutionContext::new();
/// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new())?;
/// let df1 = df.repartition(Partitioning::RoundRobinBatch(4))?;
/// # Ok(())
/// # }
/// ```
fn repartition(
&self,
partitioning_scheme: Partitioning,
) -> Result<Arc<dyn DataFrame>>;

/// Executes this DataFrame and collects all results into a vector of RecordBatch.
///
/// ```
Expand Down
11 changes: 11 additions & 0 deletions rust/datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning,
};
use crate::{arrow::record_batch::RecordBatch, physical_plan::collect};

Expand Down Expand Up @@ -111,6 +112,16 @@ impl DataFrame for DataFrameImpl {
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

fn repartition(
&self,
partitioning_scheme: Partitioning,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this introduces a new naming, partitioning_scheme.

We have:

  • partition
  • partitioning
  • partitioning_scheme
  • repartition
  • part

I do not know the common notation, but we could try to reduce the number of different names we use.

In my (little) understanding:

  • data is partitioned according to a partition
  • partitioned data is divided in parts
  • we can repartition it according to a new partition.

In this understanding, I would replace partitioning and partitioning_scheme by partition.

Even if this understanding is not correct, maybe we could reduce the number of different names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree keeping the number of different names low is important

I suggest using

  • partition to refer to an actual portion of the data (in a bunch of RecordBatches)
  • partitioning to refer to the "schema" of how the data is divided into partitions (the use of the Partitioning scheme now)

Thus we would repartition the data into a new partitioning

) -> Result<Arc<dyn DataFrame>> {
let plan = LogicalPlanBuilder::from(&self.plan)
.repartition(partitioning_scheme)?
.build()?;
Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
}

/// Convert to logical plan
fn to_logical_plan(&self) -> LogicalPlan {
self.plan.clone()
Expand Down
10 changes: 9 additions & 1 deletion rust/datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use super::dfschema::ToDFSchema;
use super::{
col, exprlist_to_fields, Expr, JoinType, LogicalPlan, PlanType, StringifiedPlan,
};
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef};
use crate::logical_plan::{DFField, DFSchema, DFSchemaRef, Partitioning};
use std::collections::HashSet;

/// Builder for logical plans
Expand Down Expand Up @@ -207,6 +207,14 @@ impl LogicalPlanBuilder {
}
}

/// Repartition
pub fn repartition(&self, partitioning_scheme: Partitioning) -> Result<Self> {
Ok(Self::from(&LogicalPlan::Repartition {
input: Arc::new(self.plan.clone()),
partitioning_scheme,
}))
}

/// Apply an aggregate
pub fn aggregate(&self, group_expr: Vec<Expr>, aggr_expr: Vec<Expr>) -> Result<Self> {
let mut all_expr: Vec<Expr> = group_expr.clone();
Expand Down
4 changes: 3 additions & 1 deletion rust/datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ pub use expr::{
};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{JoinType, LogicalPlan, PlanType, PlanVisitor, StringifiedPlan};
pub use plan::{
JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor, StringifiedPlan,
};
pub use registry::FunctionRegistry;
44 changes: 43 additions & 1 deletion rust/datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ pub enum LogicalPlan {
/// The output schema, containing fields from the left and right inputs
schema: DFSchemaRef,
},
/// Repartition the plan based on a partitioning scheme.
Repartition {
/// The incoming logical plan
input: Arc<LogicalPlan>,
/// The partitioning scheme
partitioning_scheme: Partitioning,
},
/// Produces rows from a table provider by reference or from the context
TableScan {
/// The name of the table
Expand Down Expand Up @@ -182,6 +189,7 @@ impl LogicalPlan {
LogicalPlan::Aggregate { schema, .. } => &schema,
LogicalPlan::Sort { input, .. } => input.schema(),
LogicalPlan::Join { schema, .. } => &schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => &schema,
LogicalPlan::Explain { schema, .. } => &schema,
Expand All @@ -198,6 +206,17 @@ impl LogicalPlan {
}
}

/// Logical partitioning schemes supported by the repartition operator.
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified number
/// of partitions.
/// This partitioning scheme is not yet fully supported. See https://issues.apache.org/jira/browse/ARROW-11011
Hash(Vec<Expr>, usize),
}

/// Trait that implements the [Visitor
/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a
/// depth first walk of `LogicalPlan` nodes. `pre_visit` is called
Expand Down Expand Up @@ -261,6 +280,7 @@ impl LogicalPlan {
let recurse = match self {
LogicalPlan::Projection { input, .. } => input.accept(visitor)?,
LogicalPlan::Filter { input, .. } => input.accept(visitor)?,
LogicalPlan::Repartition { input, .. } => input.accept(visitor)?,
LogicalPlan::Aggregate { input, .. } => input.accept(visitor)?,
LogicalPlan::Sort { input, .. } => input.accept(visitor)?,
LogicalPlan::Join { left, right, .. } => {
Expand Down Expand Up @@ -464,7 +484,7 @@ impl LogicalPlan {
struct Wrapper<'a>(&'a LogicalPlan);
impl<'a> fmt::Display for Wrapper<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self.0 {
match &*self.0 {
LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"),
LogicalPlan::TableScan {
ref table_name,
Expand Down Expand Up @@ -523,6 +543,28 @@ impl LogicalPlan {
keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
write!(f, "Join: {}", join_expr.join(", "))
}
LogicalPlan::Repartition {
partitioning_scheme,
..
} => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => {
write!(
f,
"Repartition: RoundRobinBatch partition_count={}",
n
)
}
Partitioning::Hash(expr, n) => {
let hash_expr: Vec<String> =
expr.iter().map(|e| format!("{:?}", e)).collect();
write!(
f,
"Repartition: Hash({}) partition_count={}",
hash_expr.join(", "),
n
)
}
},
LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n),
LogicalPlan::CreateExternalTable { ref name, .. } => {
write!(f, "CreateExternalTable: {:?}", name)
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/optimizer/hash_build_probe_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl OptimizerRule for HashBuildProbeOrder {
| LogicalPlan::TableScan { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
Expand Down
1 change: 1 addition & 0 deletions rust/datafusion/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ fn optimize_plan(
// expressions in this node to the list of required columns
LogicalPlan::Limit { .. }
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable { .. }
Expand Down
23 changes: 22 additions & 1 deletion rust/datafusion/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::datatypes::Schema;
use super::optimizer::OptimizerRule;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
Expr, LogicalPlan, Operator, PlanType, StringifiedPlan, ToDFSchema,
Expr, LogicalPlan, Operator, Partitioning, PlanType, StringifiedPlan, ToDFSchema,
};
use crate::prelude::{col, lit};
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -140,6 +140,13 @@ pub fn expressions(plan: &LogicalPlan) -> Vec<Expr> {
match plan {
LogicalPlan::Projection { expr, .. } => expr.clone(),
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
LogicalPlan::Repartition {
partitioning_scheme,
..
} => match partitioning_scheme {
Partitioning::Hash(expr, _) => expr.clone(),
_ => vec![],
},
LogicalPlan::Aggregate {
group_expr,
aggr_expr,
Expand Down Expand Up @@ -168,6 +175,7 @@ pub fn inputs(plan: &LogicalPlan) -> Vec<&LogicalPlan> {
match plan {
LogicalPlan::Projection { input, .. } => vec![input],
LogicalPlan::Filter { input, .. } => vec![input],
LogicalPlan::Repartition { input, .. } => vec![input],
LogicalPlan::Aggregate { input, .. } => vec![input],
LogicalPlan::Sort { input, .. } => vec![input],
LogicalPlan::Join { left, right, .. } => vec![left, right],
Expand Down Expand Up @@ -197,6 +205,19 @@ pub fn from_plan(
predicate: expr[0].clone(),
input: Arc::new(inputs[0].clone()),
}),
LogicalPlan::Repartition {
partitioning_scheme,
..
} => match partitioning_scheme {
Partitioning::RoundRobinBatch(n) => Ok(LogicalPlan::Repartition {
partitioning_scheme: Partitioning::RoundRobinBatch(*n),
input: Arc::new(inputs[0].clone()),
}),
Partitioning::Hash(_, n) => Ok(LogicalPlan::Repartition {
partitioning_scheme: Partitioning::Hash(expr.to_owned(), *n),
input: Arc::new(inputs[0].clone()),
}),
},
LogicalPlan::Aggregate {
group_expr, schema, ..
} => Ok(LogicalPlan::Aggregate {
Expand Down
11 changes: 10 additions & 1 deletion rust/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,13 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Unknown partitioning scheme
/// Allocate batches using a round-robin algorithm and the specified number of partitions
RoundRobinBatch(usize),
/// Allocate rows based on a hash of one of more expressions and the specified
/// number of partitions
/// This partitioning scheme is not yet fully supported. See https://issues.apache.org/jira/browse/ARROW-11011
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}

Expand All @@ -116,6 +122,8 @@ impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) => *n,
Hash(_, n) => *n,
UnknownPartitioning(n) => *n,
}
}
Expand Down Expand Up @@ -260,6 +268,7 @@ pub mod merge;
pub mod parquet;
pub mod planner;
pub mod projection;
pub mod repartition;
pub mod sort;
pub mod string_expressions;
pub mod type_coercion;
Expand Down
32 changes: 29 additions & 3 deletions rust/datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@ use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf};
use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::{
DFSchema, Expr, LogicalPlan, Operator, PlanType, StringifiedPlan,
UserDefinedLogicalNode,
DFSchema, Expr, LogicalPlan, Operator, Partitioning as LogicalPartitioning, PlanType,
StringifiedPlan, UserDefinedLogicalNode,
};
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{CaseExpr, Column, Literal, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::hash_utils;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::udf;
use crate::physical_plan::{expressions, Distribution};
use crate::physical_plan::{hash_utils, Partitioning};
use crate::physical_plan::{AggregateExpr, ExecutionPlan, PhysicalExpr, PhysicalPlanner};
use crate::prelude::JoinType;
use crate::variable::VarType;
Expand Down Expand Up @@ -228,6 +229,31 @@ impl DefaultPhysicalPlanner {
self.create_physical_expr(predicate, &input_schema, ctx_state)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?))
}
LogicalPlan::Repartition {
input,
partitioning_scheme,
} => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.schema();
let physical_partitioning = match partitioning_scheme {
LogicalPartitioning::RoundRobinBatch(n) => {
Partitioning::RoundRobinBatch(*n)
}
LogicalPartitioning::Hash(expr, n) => {
let runtime_expr = expr
.iter()
.map(|e| {
self.create_physical_expr(e, &input_schema, &ctx_state)
})
.collect::<Result<Vec<_>>>()?;
Partitioning::Hash(runtime_expr, *n)
}
};
Ok(Arc::new(RepartitionExec::try_new(
input,
physical_partitioning,
)?))
}
LogicalPlan::Sort { expr, input, .. } => {
let input = self.create_physical_plan(input, ctx_state)?;
let input_schema = input.as_ref().schema();
Expand Down
Loading