Skip to content

Commit

Permalink
Add Display to Sink and update plans
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 17, 2023
1 parent 3acd24d commit 0e20c08
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
11 changes: 9 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use futures::StreamExt;
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Debug, Display};
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -201,13 +201,20 @@ struct MemSink {
}

impl Debug for MemSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MemSink")
.field("num_partitions", &self.batches.len())
.finish()
}
}

impl Display for MemSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let partition_count = self.batches.len();
write!(f, "MemoryTable (partitions={partition_count})")
}
}

impl MemSink {
fn new(batches: Vec<PartitionData>) -> Self {
Self { batches }
Expand Down
14 changes: 7 additions & 7 deletions datafusion/core/src/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use async_trait::async_trait;
use core::fmt;
use futures::StreamExt;
use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use crate::execution::context::TaskContext;
Expand All @@ -38,9 +39,12 @@ use crate::physical_plan::Distribution;
use datafusion_common::DataFusionError;

/// `DataSink` implements writing streams of [`RecordBatch`]es to
/// destinations.
/// user defined destinations.
///
/// The `Display` impl is used to format the sink for explain plan
/// output.
#[async_trait]
pub trait DataSink: std::fmt::Debug + Send + Sync {
pub trait DataSink: Display + Debug + Send + Sync {
// TODO add desired input ordering
// How does this sink want its input ordered?

Expand Down Expand Up @@ -164,11 +168,7 @@ impl ExecutionPlan for InsertExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"InsertExec: input_partition_count={}",
self.input.output_partitioning().partition_count()
)
write!(f, "InsertExec: sink={}", self.sink)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/sqllogictests/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Dml: op=[Insert] table=[table_without_values]
--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: input_partition_count=1
InsertExec: sink=MemoryTable (partitions=1)
--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]
Expand Down Expand Up @@ -106,7 +106,7 @@ Dml: op=[Insert] table=[table_without_values]
----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: input_partition_count=1
InsertExec: sink=MemoryTable (partitions=1)
--CoalescePartitionsExec
----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
Expand Down Expand Up @@ -156,7 +156,7 @@ Dml: op=[Insert] table=[table_without_values]
--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: input_partition_count=1
InsertExec: sink=MemoryTable (partitions=8)
--ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
Expand Down

0 comments on commit 0e20c08

Please sign in to comment.