-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-10043: [Rust][DataFusion] Implement COUNT(DISTINCT col) #8222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
jorgecarleitao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this PR closely, and I like it. Really good work, @drusso !
I agree with the design:
distinct: boolon the logical plan, as part of theExpr- a new mode for a single partition (I also saw the need for a third partition mode in #8172 , which is the reason why it currently forces to always perform 2 passes)
I think that we may be able to use the GroupByKey or the ScalarValue instead of the enum; I am unsure how we work out these two different expressions depending on the mode, but I think that this is the best we can do with the current API.
@drusso , it may be worth take a look at #8172 , where we are trying to improve how to declare and run aggregate expressions. Many parts of this PR are in conflict with that one (at API level, all the implementation here is valid).
There are two additions of this PR that IMO should be incorporated into the API ASAP:
- The
distinct"mode" of an aggregate expression - The
NoPartial/SinglePartitionmode
Out of curiosity: is the DISTINCT something that is applicable to an aggregate expression in general, or is something that is only applicable to a subset of aggregations (such as count, sum)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be possible to use ScalarValue, instead of declaring a new enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are used as the values in the iterator's HashSet, and unfortunately it's not possible to derive Hash for ScalarValue since it nests f32 and f64.
As you suggested, GroupByScalar is a good candidate here. (I assumed you meant GroupByScalar rather than GroupByKey?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... this way, we do not need to go ScalarValue -> DistinctScalarValue...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and back: DistinctScalarValue -> ScalarValue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be possible to implement this one here? Loop through the elements one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Certainly. This path isn't reachable in the scenarios this PR addresses, but I think this will be relevant to SELECT COUNT(DISTINCT col) queries that do not include a GROUP BY.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this will not apply any coercion to the arguments. We are trying to make aggregates::create_aggregate_expr be the entry point to create the physical expressions, to guarantee that the signature is correct and coercion rules apply.
However, I see that this depends on the AggregateMode, which the current create_aggregate_expr does not support. #8172 addresses exactly this. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks for pointing that out. I'll have a look at this as soon as I integrate with #8172.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A naming idea: SinglePartition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark uses the term Complete. Our existing names Partial and Final were based on Spark's naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
* An [[AggregateFunction]] with [[Complete]] mode is used to evaluate this function directly
* from original input rows without any partial aggregation.
* This function updates the given aggregation buffer with the original input of this
* function. When it has processed all input rows, the final result of this function is returned.
*/
case object Complete extends AggregateMode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both suggestions sound good. Let me know if there's a preference one way or the other. For now I will go ahead with Complete for consistency with Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more efficient implementation for distinct integers could be a bitset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. A couple of questions:
- Would this generalize to (or be easily adapted for) non-integers, like floats or strings?
- Is there DataFusion tooling for benchmarking different implementations?
Given that, perhaps this is an optimization we can explore at a later time?
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this approach (of keeping the distinct values as set) will work; Given that @drusso coded it up, and it has tests I think it would be good enough to merge after addressing comments.
One potential downside of the approach in this PR is that it will likely struggle when there are a large number of distinct values per group (as the FnvHashSet will be huge) as well as needing special support for each datatype.
Another approach we could consider is to use a HashAggregateExpr operation as a pre-filter to remove duplicates. The example of:
SELECT c1, COUNT(DISTINCT c2) FROM t1 GROUP BY c1
Could be expressed as a (LogicalPlan) like the following (no physical plan changes needed):
HashAggregateExec: // this second phase then counts
group_expr:
Column(c1)
aggr_expr:
CountReduce(Column(c2))
input:
HashAggregateExec: // this first agg expr finds all distinct values of (c1,c2)
group_expr:
Column(c1), Column(c2)
input:
CsvExec:
The primary advantage of this strategy is that it can take advantage of both the existing GroupBy machinery as well as any future enhancements that are made. For example it would handle all datatypes without any special case code today. Also if we added any other group by types (e.g. GroupByMerge (if the data was sorted by group keys) that could also be used for distinct queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer that we use expect rather than unwrap so we can add a meaningful message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, will update.
|
Thanks @drusso this looks great. I agree with Jorge's comments about ScalarValue and I also added some minor comments. |
|
Thanks for the review/feedback all!
I will get the changes here updated and integrated with #8172. It looks like they've already landed 👍
To my knowledge (Also note that
Agreed. I'm happy to continue iterating and improving on the work here.
I had this thought as well, however – and correct me if I'm wrong – it doesn't generalize to a scenario like: For the |
|
@drusso I think you are correct that we would need a separate group by operator for each count distinct and then combine them together: so Or something. I like your suggestion to get an implementation in (this one) and then iterate as needed |
|
Hi @drusso I would like to review and merge this DF PR next. Would you mind rebasing? |
|
@andygrove (cc @jorgecarleitao): My apologies, I don't have the changes ready yet. Though I did have some time today to look into integrating this with #8172/master. I'm still digesting the upstream changes, and I see accumulators now have partial "state" that they emit and ingest across stages. This is great, and I think it would work well for a distinct count accumulator implementation. However, I think there's a small roadblock with porting the current implementation. I believe the approach would be to implement As an example, given a table: And given a query: Then the set of output Assuming this is all looking correct so far, then the issue is There are a number of paths forward. Brainstorming some solutions:
enum Value {
Single(ScalarValue),
Multi(Vec<ScalarValue>),
}Let me know what you think and how we can proceed. Thanks! |
|
I agree with your reasoning. AFAIK, With this in mind, another idea is to map your second idea to:
I think that this was the original intention of those structures in the Arrow specs: to compose complex structures. This was also the reasoning behind emitting This would keep us using Arrow as the medium of holding and transmitting (immutable) data, and would keep the execution's schema a bit more predictable. |
|
Thanks for clarifying Arrow scalars, I was under the wrong impression they were strictly for primitive types and not composite types. An Arrow scalar as a single item/element in any With that in mind, and following the lead from pyarrow, the corresponding scalar variants would be:
In the scope of this pull request, I would need to add Let me know if that sounds good and I can proceed. |
|
Sounds good to me 🚀 |
|
Just a quick update here, I'll have some time today and over the weekend to work through the changes. |
|
I've rebased against the latest changes on the master branch. Some notes and questions about the changes:
Let me know what you think and if any changes are needed. Thanks! |
jorgecarleitao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through this. Really good work, @drusso . The design and implementation looks good. My main comments revolve around tests, as IMO we need a bit more coverage.
IMO it is ok to push to both arrow and datafusion in a single PR. Kernels to the arrow project is a good thing IMO, so that consumers of the arrow (but not datafusion) also benefit from them. If needed, we can create a separate PR for it also. Feel free to add take for LargeList to Jira, in case someone feels motivated to work on it.
I do not think that there is an implementation of Vec<Vec<_>> to List yet, no. :(
I think that it is fine keeping ListArray for now.
The state of DistinctCountAccumulator uses one field per input input argument. [...]
I agree, and I think that we may be able to place all states in a single StructArray, which would simplify things. The trade-off is the vectorization. I imagine we benefit when merge_batch uses an existing horizontal operation, and having a common, aligned types, helps.
As discussed, I've replaced DistinctScalarValue with GroupByScalar. To do so, I promoted GroupByScalar to its own module.
👍
| } | ||
|
|
||
| impl Accumulator for DistinctCountAccumulator { | ||
| fn update_batch(&mut self, arrays: &Vec<ArrayRef>) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this expression seems similar to the default update_batch (implemented by the trait Accumulator). Do you think there's a way to generalize the default to take this Accumulator into account, so that we can DRY some code? (same for merge_batch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a look and get back to you sometime today or tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The update_batch() were actually equivalent, removed it. See 3c87e89.
There's also a way to remove the specialized merge_batch() implementation if merge() is implemented as:
fn merge(&mut self, states: &Vec<ScalarValue>) -> Result<()> {
if states.len() == 0 {
return Ok(());
}
let col_values = states
.iter()
.map(|state| match state {
ScalarValue::List(Some(values), _) => Ok(values),
_ => Err(ExecutionError::InternalError(
"Unexpected accumulator state".to_string(),
)),
})
.collect::<Result<Vec<_>>>()?;
(0..col_values[0].len())
.map(|row_index| {
let row_values = col_values
.iter()
.map(|col| col[row_index].clone())
.collect::<Vec<_>>();
self.update(&row_values)
})
.collect::<Result<_>>()
}I think that works out well. I don't have any preference one way or the other, should I make the swap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is great! Less code for the update :)
For the merge, If they are equivalent speed-wise, I think that it would make it a bit easier to understand: this is how we merge a single element; this is how we update a single element. But both are LGTM for me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the switch (see d61ec97). For now I think the DRY approach makes sense, and as a follow-up I can explore how the different options perform. I can use benches/aggregate_query_sql.rs as a starting point for the comparisons, if that makes sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the impact of d61ec97:
I used the benchmarks in #8606 to compare 57893b4 vs. 57893b4+the following patch, and there was indeed no noticeable difference in the implementations 👍
diff --git a/rust/datafusion/src/physical_plan/distinct_expressions.rs b/rust/datafusion/src/physical_plan/distinct_expressions.rs
index c2183ca3b..d7745b0fa 100644
--- a/rust/datafusion/src/physical_plan/distinct_expressions.rs
+++ b/rust/datafusion/src/physical_plan/distinct_expressions.rs
@@ -23,6 +23,7 @@ use std::hash::Hash;
use std::sync::Arc;
use arrow::datatypes::{DataType, Field};
+use arrow::array::{ArrayRef, ListArray};
use fnv::FnvHashSet;
@@ -123,29 +124,27 @@ impl Accumulator for DistinctCountAccumulator {
}
fn merge(&mut self, states: &Vec<ScalarValue>) -> Result<()> {
- if states.len() == 0 {
- return Ok(());
- }
+ self.update(states)
+ }
- let col_values = states
+ fn merge_batch(&mut self, states: &Vec<ArrayRef>) -> Result<()> {
+ let list_arrays = states
.iter()
- .map(|state| match state {
- ScalarValue::List(Some(values), _) => Ok(values),
- _ => Err(DataFusionError::Internal(
- "Unexpected accumulator state".to_string(),
- )),
+ .map(|state_array| {
+ state_array.as_any().downcast_ref::<ListArray>().ok_or(
+ DataFusionError::Internal(
+ "Failed to downcast ListArray".to_string(),
+ ),
+ )
})
.collect::<Result<Vec<_>>>()?;
- (0..col_values[0].len())
- .map(|row_index| {
- let row_values = col_values
- .iter()
- .map(|col| col[row_index].clone())
- .collect::<Vec<_>>();
- self.update(&row_values)
- })
- .collect::<Result<_>>()
+ let values_arrays = list_arrays
+ .iter()
+ .map(|list_array| list_array.values())
+ .collect::<Vec<_>>();
+
+ self.update_batch(&values_arrays)
}
fn state(&self) -> Result<Vec<ScalarValue>> {
| .downcast_ref::<array::StringArray>() | ||
| .unwrap() | ||
| .value(row_index), | ||
| ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that I also miss a test in this module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 15e6f0b.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am really sorry, I misread this. What I meant was not to test the format_batch function, but to test count distinct in integration.
However, I see that this was not the module I though it was. The code in this module is perfect as is now.
I meant adding a test here, which is where we perform integration tests.
Something along the lines of:
#[tokio::test]
async fn query_count_distinct() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Int32, true),
]));
let data = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![Some(0), Some(1), None, Some(3), Some(3)])),
],
)?;
let table = MemTable::new(schema, vec![vec![data]])?;
let mut ctx = ExecutionContext::new();
ctx.register_table("test", Box::new(table));
let sql = "SELECT COUNT(DISTINCT c1) FROM test";
let actual = execute(&mut ctx, sql).await;
let expected = vec![
vec!["0", "1"],
vec!["1", "1"],
vec!["3", "2"],
];
assert_eq!(expected, actual);
Ok(())
}does this make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem!
This makes perfect sense, and I added the test (2d0999a). However, I also had to make the count(distinct) field nullable (4cdb951), otherwise I ran into an assertion error here due to a mismatch of the field's nullability between the logical plan and physical plan.
The logical plan nullability is set here, and is always nullable. All of the regular aggregate expressions mark their field as nullable, for example, here for the regular Count.
I might be mistaken, but I think regular and distinct counts should be non-nullable? In any case, I went with making count(distinct) nullable for consistency with count(). Perhaps there's a follow-up here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that count(distinct) should be null when all entries are null, for consistency with count. I do not think that it blocks this, though. It is an edge case IMO.
… distinct aggregations.
…lable. This makes COUNT(DISTINCT) consistent with COUNT().
jorgecarleitao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Really great work, @drusso , both the implementation, the approach to the problem, and the follow-up up to completion.
|
Thanks @jorgecarleitao! And also, thank you for taking the time for the thorough review. This was a fun feature to work on. |
[ARROW-10510](https://issues.apache.org/jira/browse/ARROW-10510) This change adds benchmarks for `COUNT(DISTINCT)` queries. This is a small follow-up to [ARROW-10043](https://issues.apache.org/jira/browse/ARROW-10043) / #8222. In that PR, a number of implementation ideas were discussed for follow-ups, and having benchmarks will help evaluate them. --- There are two benchmarks added: * wide: all of the values are distinct; this is looking at worst-case performance * narrow: only a handful of distinct values; this is closer to best-case performance The wide benchmark runs ~ 7x slower than the narrow benchmark. Closes #8606 from drusso/ARROW-10510 Authored-by: Daniel Russo <[email protected]> Signed-off-by: Neville Dipale <[email protected]>
This is a proposal for an initial and partial implementation of the
DISTINCTkeyword. OnlyCOUNT(DISTINCT)is supported, with the following conditions:(a) only one argument, i.e.
COUNT(DISTINCT col), but notCOUNT(DISTINCT col, other),(b) the argument is an integer type, and
(c) the query must have a
GROUP BYclause.Implementation Overview:
The
Expr::AggregateFunctionvariant has a new field,distinct, which mirrors thedistinctflag fromSQLExpr::Function(up until now this flag was unused). AnyExpr::AggregateFunctionmay have itsdistinctflag switched totrueif the keyword is present in the SQL query. However, the physical planner respects it only forCOUNTexpressions.The count distinct aggregation slots into the existing physical plans as a new set of
AggregateExpr. To demonstrate, below are examples of the physical plans for the following query, wherec1may be any data type, andc2is aUInt8column:(a) Multiple Partitions:
The
DistinctCountaccumulates eachUInt8into a list of distinctUInt8. No counts are collected yet, this is a partial result: lists of distinct values. In theRecordBatch, this is aLargeListArray<UInt8>column. After theMergeExec, each list inLargeListArray<UInt8>is accumulated byDistinctCountReduce(viaaccumulate_batch()), producing the final sets of distinct values. Finally, given the finalized sets of distinct values, the counts are computed (always asUInt64).(b) Single Partition:
This scenario is unlike the multiple partition scenario:
DistinctCountis not used, and there are no partial sets of distinct values. Rather, in a singleHashAggregateExecstage, eachUInt8is accumulated into a distinct value set, then the counts are computed at the end of the stage.DistinctCountReduceis used, but note that unlike the multiple partition case, it accumulates scalars viaaccumulate_scalar().There is a new aggregation mode:
NoPartial. In summary, the modes are:NoPartial: used in single-stage aggregationsPartial: used as the first stage of two-stage aggregationsFinal: used as the second stage of two-stage aggregaionsPrior to the new
NoPartialmode,Partialwas handling both of what are now the responsibilities ofPartialandNoPartial. No distinction was required, because non-distinct aggregations (such as count, sum, min, max, and avg) do not need the distinction: the first aggregation stage is always the same, regardless of whether the aggregation is one-stage or two-stage. This is not the case for a distinct count aggregation, and we can see that in the physical plans above.