Skip to content

Conversation

@davies
Copy link
Contributor

@davies davies commented Jan 21, 2016

This PR add support for grouping keys for generated TungstenAggregate.

Spilling and performance improvements for BytesToBytesMap will be done by followup PR.

@SparkQA
Copy link

SparkQA commented Jan 21, 2016

Test build #49829 has finished for PR 10855 at commit 3e792f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator

@davies davies force-pushed the gen_keys branch 2 times, most recently from ca9a772 to a98bc05 Compare January 21, 2016 03:00
@SparkQA
Copy link

SparkQA commented Jan 21, 2016

Test build #49847 has finished for PR 10855 at commit a98bc05.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2016

Test build #49855 has finished for PR 10855 at commit 7d1bd43.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Jan 21, 2016

@nongli @rxin This PR is ready for review now.

@SparkQA
Copy link

SparkQA commented Jan 21, 2016

Test build #49862 has finished for PR 10855 at commit 7880786.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50060 has finished for PR 10855 at commit 9cc7925.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50063 has finished for PR 10855 at commit 9a42b52.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment.

Davies Liu added 2 commits January 26, 2016 22:57
Conflicts:
	sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@davies
Copy link
Contributor Author

davies commented Jan 27, 2016

@nongli I had pulled out the unrelated changes into #10944, we should review and merge that one first.

@SparkQA
Copy link

SparkQA commented Jan 27, 2016

Test build #50178 has finished for PR 10855 at commit 48e125c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@SparkQA
Copy link

SparkQA commented Jan 28, 2016

Test build #50307 has finished for PR 10855 at commit efe7fa2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50312 has finished for PR 10855 at commit 3bfdeb2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Davies Liu added 2 commits January 29, 2016 10:09
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50389 has finished for PR 10855 at commit 858c1e3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class LinearRegressionModel(JavaModel, MLWritable, MLReadable):
    • class JavaMLWriter(object):
    • class MLWritable(object):
    • class JavaMLReader(object):
    • java_class = cls._java_loader_class(clazz)
    • class MLReadable(object):
    • case class SetDatabaseCommand(databaseName: String) extends RunnableCommand

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it weird you've split this up this way. Can you comment that this is only used when its grouping with no agg?

Why do you manage this in side the TungstenAggregation class but create the hashmap so differently? They seem logically serving the same purpose.

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #2475 has finished for PR 10855 at commit d3c2406.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Jan 29, 2016

@davies Can you include the generated output?

Davies Liu added 2 commits January 29, 2016 12:41
@davies
Copy link
Contributor Author

davies commented Jan 29, 2016

For query

sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").count().count()

Will generate the code as following:

/* 001 */
/* 002 */ public Object generate(Object[] references) {
/* 003 */   return new GeneratedIterator(references);
/* 004 */ }
/* 005 */
/* 006 */ class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */
/* 008 */   private Object[] references;
/* 009 */   private boolean TungstenAggregate_initAgg0;
/* 010 */   private org.apache.spark.sql.execution.aggregate.TungstenAggregate TungstenAggregate_plan1;
/* 011 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap TungstenAggregate_hashMap2;
/* 012 */   private org.apache.spark.unsafe.KVIterator TungstenAggregate_mapIter3;
/* 013 */   private UnsafeRow TungstenAggregate_result10;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder TungstenAggregate_holder11;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter TungstenAggregate_rowWriter12;
/* 016 */   private boolean Range_initRange15;
/* 017 */   private long Range_partitionEnd16;
/* 018 */   private long Range_number17;
/* 019 */   private boolean Range_overflow18;
/* 020 */   private UnsafeRow TungstenAggregate_result28;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder TungstenAggregate_holder29;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter TungstenAggregate_rowWriter30;
/* 023 */
/* 024 */   private void TungstenAggregate_doAggregateWithKeys14() throws java.io.IOException {
/* 025 */
/* 026 */     // initialize Range
/* 027 */     if (!Range_initRange15) {
/* 028 */       Range_initRange15 = true;
/* 029 */       if (input.hasNext()) {
/* 030 */         initRange(((InternalRow) input.next()).getInt(0));
/* 031 */       } else {
/* 032 */         return;
/* 033 */       }
/* 034 */     }
/* 035 */
/* 036 */     while (!Range_overflow18 && Range_number17 < Range_partitionEnd16) {
/* 037 */       long Range_value19 = Range_number17;
/* 038 */       Range_number17 += 1L;
/* 039 */       if (Range_number17 < Range_value19 ^ 1L < 0) {
/* 040 */         Range_overflow18 = true;
/* 041 */       }
/* 042 */
/* 043 */       /* (input[0, bigint] & 65535) */
/* 044 */       /* input[0, bigint] */
/* 045 */
/* 046 */       /* 65535 */
/* 047 */
/* 048 */       long Project_value21 = -1L;
/* 049 */       Project_value21 = Range_value19 & 65535L;
/* 050 */
/* 051 */
/* 052 */       // generate grouping key
/* 053 */
/* 054 */
/* 055 */
/* 056 */       /* input[0, bigint] */
/* 057 */
/* 058 */       TungstenAggregate_rowWriter30.write(0, Project_value21);
/* 059 */
/* 060 */
/* 061 */       UnsafeRow TungstenAggregate_aggBuffer32 = TungstenAggregate_hashMap2.getAggregationBufferFromUnsafeRow(TungstenAggregate_result28);
/* 062 */       if (TungstenAggregate_aggBuffer32 == null) {
/* 063 */         // failed to allocate the first page
/* 064 */         throw new OutOfMemoryError("No enough memory for aggregation");
/* 065 */       }
/* 066 */
/* 067 */       // evaluate aggregate function
/* 068 */
/* 069 */       // update aggregate buffer
/* 070 */
/* 071 */
/* 072 */
/* 073 */     }
/* 074 */
/* 075 */
/* 076 */     TungstenAggregate_mapIter3 = TungstenAggregate_hashMap2.iterator();
/* 077 */   }
/* 078 */
/* 079 */
/* 080 */   private void initRange(int idx) {
/* 081 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 082 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 083 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(20971520L);
/* 084 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 085 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 086 */
/* 087 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 088 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 089 */       Range_number17 = Long.MAX_VALUE;
/* 090 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 091 */       Range_number17 = Long.MIN_VALUE;
/* 092 */     } else {
/* 093 */       Range_number17 = st.longValue();
/* 094 */     }
/* 095 */
/* 096 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 097 */     .multiply(step).add(start);
/* 098 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 099 */       Range_partitionEnd16 = Long.MAX_VALUE;
/* 100 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 101 */       Range_partitionEnd16 = Long.MIN_VALUE;
/* 102 */     } else {
/* 103 */       Range_partitionEnd16 = end.longValue();
/* 104 */     }
/* 105 */   }
/* 106 */
/* 107 */
/* 108 */   public GeneratedIterator(Object[] references) {
/* 109 */     this.references = references;
/* 110 */     TungstenAggregate_initAgg0 = false;
/* 111 */     this.TungstenAggregate_plan1 = (org.apache.spark.sql.execution.aggregate.TungstenAggregate) references[0];
/* 112 */     TungstenAggregate_hashMap2 = TungstenAggregate_plan1.createHashMap();
/* 113 */
/* 114 */     TungstenAggregate_result10 = new UnsafeRow(1);
/* 115 */     this.TungstenAggregate_holder11 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(TungstenAggregate_result10, 0);
/* 116 */     this.TungstenAggregate_rowWriter12 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(TungstenAggregate_holder11, 1);
/* 117 */     Range_initRange15 = false;
/* 118 */     Range_partitionEnd16 = 0L;
/* 119 */     Range_number17 = 0L;
/* 120 */     Range_overflow18 = false;
/* 121 */     TungstenAggregate_result28 = new UnsafeRow(1);
/* 122 */     this.TungstenAggregate_holder29 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(TungstenAggregate_result28, 0);
/* 123 */     this.TungstenAggregate_rowWriter30 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(TungstenAggregate_holder29, 1);
/* 124 */   }
/* 125 */
/* 126 */   protected void processNext() throws java.io.IOException {
/* 127 */
/* 128 */     if (!TungstenAggregate_initAgg0) {
/* 129 */       TungstenAggregate_initAgg0 = true;
/* 130 */       TungstenAggregate_doAggregateWithKeys14();
/* 131 */     }
/* 132 */
/* 133 */     // output the result
/* 134 */     while (TungstenAggregate_mapIter3.next()) {
/* 135 */       UnsafeRow TungstenAggregate_aggKey4 = (UnsafeRow) TungstenAggregate_mapIter3.getKey();
/* 136 */       UnsafeRow TungstenAggregate_aggBuffer5 = (UnsafeRow) TungstenAggregate_mapIter3.getValue();
/* 137 */
/* 138 */       /* input[0, bigint] */
/* 139 */       long TungstenAggregate_value7 = TungstenAggregate_aggKey4.getLong(0);
/* 140 */
/* 141 */       /* input[0, bigint] */
/* 142 */
/* 143 */       TungstenAggregate_rowWriter12.write(0, TungstenAggregate_value7);
/* 144 */       currentRow = TungstenAggregate_result10;
/* 145 */       return;
/* 146 */
/* 147 */
/* 148 */     }
/* 149 */
/* 150 */     TungstenAggregate_hashMap2.free();
/* 151 */
/* 152 */   }
/* 153 */ }
/* 154 */

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50400 has finished for PR 10855 at commit d3c2406.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Jan 29, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #2477 has finished for PR 10855 at commit d3c2406.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #50412 has finished for PR 10855 at commit 940c88d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 29, 2016

Test build #2479 has finished for PR 10855 at commit 940c88d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #2482 has finished for PR 10855 at commit 940c88d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #50420 has finished for PR 10855 at commit caad24f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #2483 has finished for PR 10855 at commit caad24f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor Author

davies commented Jan 30, 2016

Merging this into master.

@asfgit asfgit closed this in e6a02c6 Jan 30, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants