Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Sep 18, 2019

What changes were proposed in this pull request?

This patch fixes the issue brought by SPARK-21870: when generating code for parameter type, it doesn't consider array type in javaType. At least we have one, Spark should generate code for BinaryType as byte[], but Spark create the code for BinaryType as [B and generated code fails compilation.

Below is the generated code which failed compilation (Line 380):

/* 380 */   private void agg_doAggregate_count_0([B agg_expr_1_1, boolean agg_exprIsNull_1_1, org.apache.spark.sql.catalyst.InternalRow agg_unsafeRowAggBuffer_1) throws java.io.IOException {
/* 381 */     // evaluate aggregate function for count
/* 382 */     boolean agg_isNull_26 = false;
/* 383 */     long agg_value_28 = -1L;
/* 384 */     if (!false && agg_exprIsNull_1_1) {
/* 385 */       long agg_value_31 = agg_unsafeRowAggBuffer_1.getLong(1);
/* 386 */       agg_isNull_26 = false;
/* 387 */       agg_value_28 = agg_value_31;
/* 388 */     } else {
/* 389 */       long agg_value_33 = agg_unsafeRowAggBuffer_1.getLong(1);
/* 390 */
/* 391 */       long agg_value_32 = -1L;
/* 392 */
/* 393 */       agg_value_32 = agg_value_33 + 1L;
/* 394 */       agg_isNull_26 = false;
/* 395 */       agg_value_28 = agg_value_32;
/* 396 */     }
/* 397 */     // update unsafe row buffer
/* 398 */     agg_unsafeRowAggBuffer_1.setLong(1, agg_value_28);
/* 399 */   }

There wasn't any test for HashAggregateExec specifically testing this, but randomized test in ObjectHashAggregateSuite could encounter this and that's why ObjectHashAggregateSuite is flaky.

Why are the changes needed?

Without the fix, generated code from HashAggregateExec may fail compilation.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added new UT. Without the fix, newly added UT fails.

@HeartSaVioR
Copy link
Contributor Author

cc. @maropu @cloud-fan @viirya

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Sep 18, 2019

Btw, I had to fix some code locally to get generated code for failing case. We are logging generated code via INFO, but for CI build it might be better if we log generated code via ERROR if test fails due to this.

import testImplicits._

test("SPARK-29140 HashAggregateExec aggregating binary type doesn't break codegen compilation") {
val withDistinct = countDistinct($"c1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to AggregationQuerySuite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, didn't indicate the existence of suite. Will move.

.groupBy($"id" % 10 as "group")
.agg(withDistinct)
.orderBy("group")
aggDf.collect().toSeq
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz check the result.

""".stripMargin
}

private def typeNameForCodegen(clazz: Class[_]): String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to move this helper function to CodeGenerator.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and just call typeName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestions of both comments! Will address.

@maropu
Copy link
Member

maropu commented Sep 18, 2019

In the PR title, array types is more obvious than binary types?

@SparkQA
Copy link

SparkQA commented Sep 18, 2019

Test build #110908 has finished for PR 25830 at commit 9c76557.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

array types is more obvious than binary types?

array type is ArrayData, which doesn't have this issue.

@HeartSaVioR
Copy link
Contributor Author

array types is more obvious than binary types?

array type is ArrayData, which doesn't have this issue.

Yes exactly. I suspected about ArrayType for the first time, and realized its javaType is not array. So for now, only BinaryType hits the issue, if I understand correctly. Maybe array types in javaType could be more obvious.

@HeartSaVioR HeartSaVioR changed the title [SPARK-29140][SQL] Handle BinaryType of parameter properly in HashAggregateExec [SPARK-29140][SQL] Handle parameters having "array" of javaType properly in HashAggregateExec Sep 18, 2019
@HeartSaVioR
Copy link
Contributor Author

Updated, please take a next round of review. Thanks!

val withDistinct = countDistinct($"c1")

val schema = new StructType().add("c1", BinaryType, nullable = true)
val schemaWithId = StructType(StructField("id", IntegerType, nullable = false) +: schema.fields)
Copy link
Member

@viirya viirya Sep 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: curious why you don't just have schema? schema is not used in other place.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Sep 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's missed. I copied the test code from ObjectHashAggregateSuite (as the test actually failed there) and tried to minimize the code to reproduce and clean up, but missed this. Thanks for pointing out!

@maropu
Copy link
Member

maropu commented Sep 18, 2019

There wasn't any test for HashAggregateExec specifically testing this, but randomized test in ObjectHashAggregateSuite could encounter this and that's why ObjectHashAggregateSuite is flaky.

Oh... that's my fault..., anyway nice catch!

@maropu
Copy link
Member

maropu commented Sep 18, 2019

btw, agg_expr_1_1 is not used inside the aggregation function. Actually, in HashAggregateExec, aggregation functions can only process fixed-length typed data and decimal values?
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L1126
If so, I think we should filter out these complex types from the arguments of aggregate functions.

@viirya
Copy link
Member

viirya commented Sep 18, 2019

btw, agg_expr_1_1 is not used inside the aggregation function. Actually, in HashAggregateExec, aggregation functions can only process fixed-length typed data and decimal values?

Oh, this is good point. I think it is only said that aggregate function's buffer attributes can only be in certain types. Aggregate functions still can have inputs of complex data type?

However I don't know if there are existing aggregate functions in Spark take complex inputs but use buffer attributes that are acceptable for HashAggregateExec.

Just think in theory we might have such aggregate functions?

@HeartSaVioR
Copy link
Contributor Author

Btw, submitted a patch #25835 for following up my own comment: #25830 (comment)

Please review if it makes sense. Thanks!

@HeartSaVioR
Copy link
Contributor Author

And another possible improvement on the randomized test in ObjectHashAggregateSuite... How about logging selected parameters as WARN/ERROR or include it to hint message on assert? Actually I had to modify the test to run the the test code until it fails, as some information is provided in test name but others like schema are not.

@maropu
Copy link
Member

maropu commented Sep 19, 2019

Oh, this is good point. I think it is only said that aggregate function's buffer attributes can only be in certain types. Aggregate functions still can have inputs of complex data type?
However I don't know if there are existing aggregate functions in Spark take complex inputs but use buffer attributes that are acceptable for HashAggregateExec.
Just think in theory we might have such aggregate functions?

might be so (IIUC we have no restriction about that; buffers should have the same type with input data). So, for safeguards, how about turning off the split mode instead of forcibly passing complex data into split functions?

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #110938 has finished for PR 25830 at commit 7c66afc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 19, 2019

Test build #110934 has finished for PR 25830 at commit 5ffa7e7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

Just to determine the next action, would we want to include newer discussion (@viirya and @maropu are discussing) for the scope of this PR?

@viirya
Copy link
Member

viirya commented Sep 20, 2019

As I said, I think it is possible an aggregate function accesses complex data input like array but uses a buffer attribute which is supported by HashAggregateExec.

If you just filter out complex data types out, the split function for such aggregation function won't work.

So currently this looks good to me.

I am not sure if we want to turn off split mode just because of array argument as @maropu suggested. cc @cloud-fan

@cloud-fan
Copy link
Contributor

I think the added test well demonstrate that there are agg functions take complex input and use simple buffers.

val emptyRows = spark.sparkContext.parallelize(Seq.empty[Row], 1)
val aggDf = spark.createDataFrame(emptyRows, schema)
.groupBy($"id" % 10 as "group")
.agg(withDistinct)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can simply put countDistinct($"c1") here.

val aggDf = spark.createDataFrame(emptyRows, schema)
.groupBy($"id" % 10 as "group")
.agg(withDistinct)
.orderBy("group")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need .orderby for this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can be removed. Will remove.

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111059 has finished for PR 25830 at commit ce2b17f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Sep 20, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111065 has finished for PR 25830 at commit ce2b17f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

test("SPARK-29122: hash-based aggregates for unfixed-length decimals in the interpreter mode") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString) {
SQLConf.CODEGEN_FACTORY_MODE.key -> CodegenObjectFactoryMode.NO_CODEGEN.toString) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think previous is correct...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah IDE automatically indented while fixing conflicts. Will roll back.

@SparkQA
Copy link

SparkQA commented Sep 20, 2019

Test build #111089 has finished for PR 25830 at commit 28726da.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu changed the title [SPARK-29140][SQL] Handle parameters having "array" of javaType properly in HashAggregateExec [SPARK-29140][SQL] Handle parameters having "array" of javaType properly in splitAggregateExpressions Sep 20, 2019
@maropu
Copy link
Member

maropu commented Sep 20, 2019

I modified the title a bit.

@SparkQA
Copy link

SparkQA commented Sep 21, 2019

Test build #111096 has finished for PR 25830 at commit 4c00a2b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu closed this in f7cc695 Sep 21, 2019
@maropu
Copy link
Member

maropu commented Sep 21, 2019

Thanks, all! Merged to master.

@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

@HeartSaVioR HeartSaVioR deleted the SPARK-29140 branch September 21, 2019 08:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants