Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INSERT returns number of rows written, add InsertExec to handle common case. #6354

Merged
merged 10 commits into from
May 19, 2023

Conversation

alamb
Copy link
Contributor

@alamb alamb commented May 15, 2023

Which issue does this PR close?

Part of #6339

Rationale for this change

I want to make it easier to add support for writing to different datasources (I want to implementCOPY ... TO ... support in #5654 and @JanKaul is working to implement a delta.rs target).

Initially, I proposed changing the API on TableProvider in #6339 (see #6347) but as the discussion with @ozankabak and @tustvold revealed, keeping the full flexibility if an ExecutionPlan if/until we better understand the requirements seems a better course of action.

However, I think helping people (like myself) implement ExecutionPlans (and avoid boilerplate) is still quite helpful.

What changes are included in this PR?

  1. Add documentation to TableProvider::insert_into
  2. Add DataSink API (thanks @tustvold and @JanKaul for this discussion)
  3. Refactor the implementation of MemWriteExec into InsertExec and a DataSink API
  4. Port explain / insert tests to sqllogictests
  5. Add tests for counts

Are these changes tested?

Yes

Are there any user-facing changes?

There is a new InsertExec operator available for users

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels May 15, 2023
@@ -480,8 +512,8 @@ mod tests {

// Execute the physical plan and collect the results
let res = collect(plan, session_ctx.task_ctx()).await?;
// Ensure the result is empty after the insert operation
assert!(res.is_empty());
assert_eq!(extract_count(res), expected_count);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the test for the new feature of returning row counts


// Test the less-lock mode by inserting a large number of batches into a table.
#[tokio::test]
async fn test_one_to_one_mode() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there are no locking differences now between different partitioning combinations of insert to execute, I don't think these tests are adding any additional coverage that is not in

https://github.com/apache/arrow-datafusion/blob/eb918ab217213d5e07e71e53c118a8409d2f71a0/datafusion/core/src/datasource/memory.rs#L455-L476

So I opted to remove them

}

#[tokio::test]
async fn test_insert_into() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are ported to insert.slt

let mut new_batches = vec![vec![]; num_partitions];
let mut i = 0;
let mut row_count = 0;
while let Some(batch) = data.next().await.transpose()? {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this formulation of MemTable insert is easier to understand as the code is now in the same place alongside the code that defines the rest of the data structures

}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the major change in this PR, we no longer attempt to expose partitioning, I am 👍 on this change

Copy link
Contributor Author

@alamb alamb May 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But to be clear since the TableProvider API remains in terms of ExecutionPlan if someone wanted to control the partitioning more they could do so (by defining their own ExecutionPlan)

// write the outputs into the batches
for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
// Append all the new batches in one go to minimize locking overhead
target.write().await.append(&mut batches);
Copy link
Contributor

@tustvold tustvold May 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can now remove the per-partition locks 🤔

Possibly even switching to using a non-async lock even...

@alamb
Copy link
Contributor Author

alamb commented May 15, 2023

cc @ozankabak and @metesynnada and @JanKaul

@JanKaul
Copy link
Contributor

JanKaul commented May 16, 2023

Looks great, thank you @alamb !

----
100

statement ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can add a query like

SELECT * FROM table_without_values
LIMIT 5

to test whether the result is written to the table before dropping the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in b1464da

/// `DataSink` implements writing streams of [`RecordBatch`]es to
/// destinations.
#[async_trait]
pub trait DataSink: std::fmt::Debug + Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a new trait method such as

fn summary(&self) -> String;

Each implementation of DataSink can implement it. By this way we can enrich the information in the InsertExec plan. For instance, we may write that sink is indeed Memsink. Or if it is CsvSink(future), it writes its result to path: a/b/c.csv

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just implement std::fmt::Display perhaps?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, then we can extend fmt_as method of the InsertExec to print self.sink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in 0e20c08

}

fn maintains_input_order(&self) -> Vec<bool> {
vec![false]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think the written files as output of the executor, we indeed preserve the ordering. If we make this flag true, when we use written table in another query, the fact that table is ordered will enable us to do additional optimizations. This will effect the final plan if there is an existing ordering (putting whether CoalesceBatchesExec or SortPreservingMergeExec). What do you think about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

I am thinking that InsertExec is going to be the top of the plan (I don't expect it feeds its output to another ExecutionPlan, and produces metadata about the write (the count) so I don't think marking maintains_input_order to be true makes logical sense and marking maintains_input_order false means the optimizer will have maximum flexibility (as in it doesn't have to retain ordering).

However, perhaps you are imagining a case like

-- user is trying to  specify that `new_table` should be ordered by `x`
INSERT INTO new_table SELECT * from old_table ORDER BY x

Trying to use the order by of the input as a hint might cause problems with tables that already have data such as

INSERT INTO new_table SELECT * from old_table ORDER BY x;
-- A second insert of the same data means new_table won't be 
-- sorted by x unless extra care is taken somehow within new_table
INSERT INTO new_table SELECT * from old_table ORDER BY x;

I was thinking that eventually the table provider for new_table would somehow set required_input_order() to be x in this case. Perhaps something like

CREATE TABLE new_table ORDER BY x;
-- new_table would specify "required_input_order = x" 
-- properly handle merging with existing data 
INSERT INTO new_table SELECT * from old_table;

Copy link
Contributor

@ozankabak ozankabak May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that ordering information is specific to the target table, and ordering of an individual write does not imply an ordering of the target table on its own.

However, I think writers should preserve the ordering they receive and write in that order, if there is any. I think a user writing

INSERT INTO new_table SELECT * from old_table ORDER BY x

and seeing a different order on the table order afterwards would be bad UX and people would shoot themselves in the foot because of this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some investigation, and I am not sure the code needs to be changed. The code in this PR gets the desired behavior of preserving the order of the input query, as can be seen in the explain plan tests (I annotated with ****)

This query has ORDER BY c1

EXPLAIN
INSERT INTO table_without_values SELECT
SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
FROM aggregate_test_100
ORDER by c1

The plan shows the ordering is maintained:

logical_plan
Dml: op=[Insert] table=[table_without_values]
--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
InsertExec: sink=MemoryTable (partitions=1)
--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
   ^******** You can see here the data remain sorted as it is fed into the InsertExec
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]
--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }], mode=[Sorted]
----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
------------------CsvExec: file_groups={1 gr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good! Given that it exhibits the expected behavior, what do you think about having maintains return true? IMO it would be good for two reasons: (1) self-documentation and consistency, (2) future-proofing -- a future optimizer will likely not break this behavior given that return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think about having maintains return true?

The reason I am pushing back on changing the maintains_input_order is that I don't think InsertExec operator preserves the input's order. Specifically it doesn't satisfy the meaning of this setting

https://github.com/apache/arrow-datafusion/blob/7e5c92db5f0dcc8261a27846165236c4d44460de/datafusion/core/src/physical_plan/mod.rs#L150-L165

I guess you could argue that since the output is a single row (the count) it is technically ordered in whatever way you want, but I still feel is it incorrect and unecessary

Maybe we can find some test / example where having maintains_input_order set to true makes a difference? If so that I will gladly change it

Copy link
Contributor

@ozankabak ozankabak May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, since it doesn't really "output" what it writes, your argument makes sense.

Let me try to explain what worries me, maybe you will have some ideas to resolve my concerns. Basically, I worry that one may need to add InsertExec as a special case in future rules one writes that may change sorts/orderings -- and that wouldn't be good. I will try to elaborate through an example:

Assume that an InsertExec is at the top, and there is a SortExec under it. A rule could say "The ordering imposed by this SortExec is lost after the InsertExec, and the InsertExec itself doesn't require an ordering either, so I may just remove the SortExec" -- which would be wrong.

Having said that, making maintains return true doesn't really solve this problem either -- the heart of the problem is that an InsertExec doesn't require an input ordering when thought about in isolation, but it should behave as if the output ordering of its input is a sort-of pseudo-requirement. We don't want to result in a change of its input ordering by misleading optimizer rules to think that it is fair game to change its input ordering.

Do we need to guard against this, and if so, how do we guard against this?

Would adding something like

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
    vec![self
        .output_ordering()
        .map(PhysicalSortRequirement::from_sort_exprs)]
}

work?

Copy link
Contributor

@ozankabak ozankabak May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's analyze a concrete case, say your example query. In that query, the ordering imposed by the SortExec in the middle of the plan is required by the BoundedWindowAggExec above it, which preserves that ordering. So far so good. However, isn't the optimizer then free to use CoalescePartitionsExec instead of SortPreservingMergeExec up top (if it thinks using the former is faster)? Where does the need to still preserve the ordering come from? I'm not sure if our optimizers are that aggressive yet, but at one point they could be.

If it did that, we'd have a problem.

PS. Is the correct plan still generated if you change the global ORDER BY c1 with a column that is not c1 or c9? Maybe in that case this problem could manifest, but I'm not entirely sure.

Copy link
Contributor Author

@alamb alamb May 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would adding something like

Yes, thank you @ozankabak -- that makes sense and I have made that change and added a test in f4fba65 and 7c2f189

@alamb alamb merged commit b16a403 into apache:main May 19, 2023
@alamb alamb deleted the alamb/refactor_insert branch May 19, 2023 18:27
@alamb
Copy link
Contributor Author

alamb commented May 19, 2023

Thanks again for the comments and help @mustafasrepo and @ozankabak -- I think I addressed all your comments. If there is anything still outstanding, please let me know and I will make a follow on PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants