Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Jan 19, 2016

As discussed in #10786, the generated TungstenAggregate does not support imperative functions.

For a query

sqlContext.range(10).filter("id > 1").groupBy().count()

The generated code will looks like:

/* 032 */     if (!initAgg0) {
/* 033 */       initAgg0 = true;
/* 034 */
/* 035 */       // initialize aggregation buffer
/* 037 */       long bufValue2 = 0L;
/* 038 */
/* 039 */
/* 040 */       // initialize Range
/* 041 */       if (!range_initRange5) {
/* 042 */         range_initRange5 = true;
       ...
/* 071 */       }
/* 072 */
/* 073 */       while (!range_overflow8 && range_number7 < range_partitionEnd6) {
/* 074 */         long range_value9 = range_number7;
/* 075 */         range_number7 += 1L;
/* 076 */         if (range_number7 < range_value9 ^ 1L < 0) {
/* 077 */           range_overflow8 = true;
/* 078 */         }
/* 079 */
/* 085 */         boolean primitive11 = false;
/* 086 */         primitive11 = range_value9 > 1L;
/* 087 */         if (!false && primitive11) {
/* 092 */           // do aggregate and update aggregation buffer
/* 099 */           long primitive17 = -1L;
/* 100 */           primitive17 = bufValue2 + 1L;
/* 101 */           bufValue2 = primitive17;
/* 105 */         }
/* 107 */       }
/* 109 */
/* 110 */       // output the result
/* 112 */       bufferHolder25.reset();
/* 114 */       rowWriter26.initialize(bufferHolder25, 1);
/* 118 */       rowWriter26.write(0, bufValue2);
/* 120 */       result24.pointTo(bufferHolder25.buffer, bufferHolder25.totalSize());
/* 121 */       currentRow = result24;
/* 122 */       return;
/* 124 */     }
/* 125 */

cc @nongli

@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49720 has finished for PR 10840 at commit 006f37a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Jan 20, 2016

What's the difference between this one and #10786?

@davies
Copy link
Contributor Author

davies commented Jan 20, 2016

#10768 had more unrelated changes, is used for prototype, this is the one ready for review.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a case with multiple agg exprs?

@nongli
Copy link
Contributor

nongli commented Jan 20, 2016

Looking good, just some minor comments.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49799 has finished for PR 10840 at commit ef2ddd7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BisectingKMeans @Since(\"2.0.0\") (

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49808 has finished for PR 10840 at commit 7ccd901.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #2422 has finished for PR 10840 at commit 1beb7f1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49809 has finished for PR 10840 at commit 1beb7f1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Jan 20, 2016

The last commit had passed the tests, I'm going to merge this into master.

@asfgit asfgit closed this in b362239 Jan 20, 2016
@rxin
Copy link
Contributor

rxin commented Jan 25, 2016

@davies it would be great if we can separate the generated code into two functions -- one that does the aggregation, and the other that does the output. This way, we can separate this into two "pipelines".

cc@nongli

@davies
Copy link
Contributor Author

davies commented Jan 25, 2016

@rxin We can do that when this is a grouping key. For this case, it only output single row, usually it will be in the last few operators.

@rxin
Copy link
Contributor

rxin commented Jan 25, 2016

Why not just do it for both cases so it is more unified? I think the point is that we'd want the generated code to reflect more accurately the number of pipelines that are actually used.

@davies
Copy link
Contributor Author

davies commented Jan 25, 2016

@rxin I see, it will be addressed by #10855

expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
df.collect()
withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davies, why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, the SQLMetrics are not supported in whole stage codegen.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants