Skip to content

Commit

Permalink
[DataFrame] Read files in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 16, 2023
1 parent 413eba1 commit 0dbdd94
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
17 changes: 11 additions & 6 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion_common::alias::AliasGenerator;
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
DescribeTable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
DescribeTable, Partitioning, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
Expand Down Expand Up @@ -917,11 +917,16 @@ impl SessionContext {
/// Creates a [`DataFrame`] for a [`TableProvider`] such as a
/// [`ListingTable`] or a custom user defined provider.
pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> Result<DataFrame> {
Ok(DataFrame::new(
self.state(),
LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)?
.build()?,
))
let state = self.state();
let mut builder =
LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), None)?;
let target_partitions = state.config.target_partitions();
if target_partitions > 1 {
// Keep the data in the target number of partitions
builder =
builder.repartition(Partitioning::RoundRobinBatch(target_partitions))?;
}
Ok(DataFrame::new(state, builder.build()?))
}

/// Creates a [`DataFrame`] for reading a [`RecordBatch`]
Expand Down
18 changes: 16 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ async fn test_grouping_sets() -> Result<()> {

#[tokio::test]
async fn test_grouping_sets_count() -> Result<()> {
let ctx = SessionContext::new();
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::with_config(config);

let grouping_set_expr = Expr::GroupingSet(GroupingSet::GroupingSets(vec![
vec![col("c1")],
Expand Down Expand Up @@ -725,7 +726,8 @@ async fn test_grouping_sets_count() -> Result<()> {

#[tokio::test]
async fn test_grouping_set_array_agg_with_overflow() -> Result<()> {
let ctx = SessionContext::new();
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::with_config(config);

let grouping_set_expr = Expr::GroupingSet(GroupingSet::GroupingSets(vec![
vec![col("c1")],
Expand Down Expand Up @@ -795,6 +797,18 @@ async fn test_grouping_set_array_agg_with_overflow() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_read_partitioned() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(4);
let ctx = SessionContext::with_config(config);

let df = aggregates_table(&ctx).await?;
let plan = df.create_physical_plan().await?;

assert_eq!(plan.output_partitioning().partition_count(), 4);
Ok(())
}

#[tokio::test]
async fn join_with_alias_filter() -> Result<()> {
let join_ctx = create_join_context()?;
Expand Down

0 comments on commit 0dbdd94

Please sign in to comment.