Skip to content

Updata upstream#20

Merged
GulajavaMinistudio merged 5 commits intoGulajavaMinistudio:masterfrom
apache:master
Apr 18, 2017
Merged

Updata upstream#20
GulajavaMinistudio merged 5 commits intoGulajavaMinistudio:masterfrom
apache:master

Conversation

@GulajavaMinistudio
Copy link
Copy Markdown
Owner

What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

HyukjinKwon and others added 5 commits April 17, 2017 09:04
…m_json function in R

## What changes were proposed in this pull request?

This was suggested to be `as.json.array` at the first place in the PR to SPARK-19828 but we could not do this as the lint check emits an error for multiple dots in the variable names.

After SPARK-20278, now we are able to use `multiple.dots.in.names`. `asJsonArray` in `from_json` function is still able to be changed as 2.2 is not released yet.

So, this PR proposes to rename `asJsonArray` to `as.json.array`.

## How was this patch tested?

Jenkins tests, local tests with `./R/run-tests.sh` and manual `./dev/lint-r`. Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17653 from HyukjinKwon/SPARK-19828-followup.
…ing persistent functions

### What changes were proposed in this pull request?
The session catalog caches some persistent functions in the `FunctionRegistry`, so there can be duplicates. Our Catalog API `listFunctions` does not handle it.

It would be better if `SessionCatalog` API can de-duplciate the records, instead of doing it by each API caller. In `FunctionRegistry`, our functions are identified by the unquoted string. Thus, this PR is try to parse it using our parser interface and then de-duplicate the names.

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17646 from gatorsmile/showFunctions.
## What changes were proposed in this pull request?

This patch fixes a bug in the way LIKE patterns are translated to Java regexes. The bug causes any character following an escaped backslash to be escaped, i.e. there is double-escaping.
A concrete example is the following pattern:`'%\\%'`. The expected Java regex that this pattern should correspond to (according to the behavior described below) is `'.*\\.*'`, however the current situation leads to `'.*\\%'` instead.

---

Update: in light of the discussion that ensued, we should explicitly define the expected behaviour of LIKE expressions, especially in certain edge cases. With the help of gatorsmile, we put together a list of different RDBMS and their variations wrt to certain standard features.

| RDBMS\Features | Wildcards | Default escape [1] | Case sensitivity |
| --- | --- | --- | --- |
| [MS SQL Server](https://msdn.microsoft.com/en-us/library/ms179859.aspx) | _, %, [], [^] | none | no |
| [Oracle](https://docs.oracle.com/cd/B12037_01/server.101/b10759/conditions016.htm) | _, % | none | yes |
| [DB2 z/OS](http://www.ibm.com/support/knowledgecenter/SSEPEK_11.0.0/sqlref/src/tpc/db2z_likepredicate.html) | _, % | none | yes |
| [MySQL](http://dev.mysql.com/doc/refman/5.7/en/string-comparison-functions.html) | _, % | none | no |
| [PostreSQL](https://www.postgresql.org/docs/9.0/static/functions-matching.html) | _, % | \ | yes |
| [Hive](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF) | _, % | none | yes |
| Current Spark | _, % | \ | yes |

[1] Default escape character: most systems do not have a default escape character, instead the user can specify one by calling a like expression with an escape argument [A] LIKE [B] ESCAPE [C]. This syntax is currently not supported by Spark, however I would volunteer to implement this feature in a separate ticket.

The specifications are often quite terse and certain scenarios are undocumented, so here is a list of scenarios that I am uncertain about and would appreciate any input. Specifically I am looking for feedback on whether or not Spark's current behavior should be changed.
1. [x] Ending a pattern with the escape sequence, e.g. `like 'a\'`.
   PostreSQL gives an error: 'LIKE pattern must not end with escape character', which I personally find logical. Currently, Spark allows "non-terminated" escapes and simply ignores them as part of the pattern.
   According to [DB2's documentation](http://www.ibm.com/support/knowledgecenter/SSEPGG_9.7.0/com.ibm.db2.luw.messages.sql.doc/doc/msql00130n.html), ending a pattern in an escape character is invalid.
   _Proposed new behaviour in Spark: throw AnalysisException_
2. [x] Empty input, e.g. `'' like ''`
   Postgres and DB2 will match empty input only if the pattern is empty as well, any other combination of empty input will not match. Spark currently follows this rule.
3. [x] Escape before a non-special character, e.g. `'a' like '\a'`.
   Escaping a non-wildcard character is not really documented but PostgreSQL just treats it verbatim, which I also find the least surprising behavior. Spark does the same.
   According to [DB2's documentation](http://www.ibm.com/support/knowledgecenter/SSEPGG_9.7.0/com.ibm.db2.luw.messages.sql.doc/doc/msql00130n.html), it is invalid to follow an escape character with anything other than an escape character, an underscore or a percent sign.
   _Proposed new behaviour in Spark: throw AnalysisException_

The current specification is also described in the operator's source code in this patch.
## How was this patch tested?

Extra case in regex unit tests.

Author: Jakob Odersky <jakob@odersky.com>

This patch had conflicts when merged, resolved by
Committer: Reynold Xin <rxin@databricks.com>

Closes #15398 from jodersky/SPARK-17647.
## What changes were proposed in this pull request?

Typo fix: distitrbuted -> distributed

## How was this patch tested?

Existing tests

Author: Andrew Ash <andrew@andrewash.com>

Closes #17664 from ash211/patch-1.
…itionSuite

## What changes were proposed in this pull request?

Replace non-existent `repartitionBy` with `distribute` in `CollapseRepartitionSuite`.

## How was this patch tested?

local build and `catalyst/testOnly *CollapseRepartitionSuite`

Author: Jacek Laskowski <jacek@japila.pl>

Closes #17657 from jaceklaskowski/CollapseRepartitionSuite.
@GulajavaMinistudio GulajavaMinistudio merged commit c05b451 into GulajavaMinistudio:master Apr 18, 2017
GulajavaMinistudio pushed a commit that referenced this pull request Jan 16, 2020
…ateExpression

### What changes were proposed in this pull request?

This pr intends to add filter information in the explain output of an aggregate (This is a follow-up of apache#26656).

Without this pr:
```
scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").explain(true)
== Parsed Logical Plan ==
'Aggregate ['k], ['k, unresolvedalias('SUM('v, ('v > 3)), None)]
+- 'UnresolvedRelation [t]

== Analyzed Logical Plan ==
k: int, sum(v): bigint
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) AS sum(v)#3L]
+- SubqueryAlias `default`.`t`
   +- Relation[k#0,v#1] parquet

== Optimized Logical Plan ==
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) AS sum(v)#3L]
+- Relation[k#0,v#1] parquet

== Physical Plan ==
HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint))], output=[k#0, sum(v)#3L])
+- Exchange hashpartitioning(k#0, 200), true, [id=#20]
   +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint))], output=[k#0, sum#7L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int>

scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").show()
+---+------+
|  k|sum(v)|
+---+------+
+---+------+
```

With this pr:
```
scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").explain(true)
== Parsed Logical Plan ==
'Aggregate ['k], ['k, unresolvedalias('SUM('v, ('v > 3)), None)]
+- 'UnresolvedRelation [t]

== Analyzed Logical Plan ==
k: int, sum(v) FILTER (v > 3): bigint
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) filter (v#1 > 3) AS sum(v) FILTER (v > 3)#5L]
+- SubqueryAlias `default`.`t`
   +- Relation[k#0,v#1] parquet

== Optimized Logical Plan ==
Aggregate [k#0], [k#0, sum(cast(v#1 as bigint)) filter (v#1 > 3) AS sum(v) FILTER (v > 3)#5L]
+- Relation[k#0,v#1] parquet

== Physical Plan ==
HashAggregate(keys=[k#0], functions=[sum(cast(v#1 as bigint))], output=[k#0, sum(v) FILTER (v > 3)#5L])
+- Exchange hashpartitioning(k#0, 200), true, [id=#20]
   +- HashAggregate(keys=[k#0], functions=[partial_sum(cast(v#1 as bigint)) filter (v#1 > 3)], output=[k#0, sum#9L])
      +- *(1) ColumnarToRow
         +- FileScan parquet default.t[k#0,v#1] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<k:int,v:int>

scala> sql("select k, SUM(v) filter (where v > 3) from t group by k").show()
+---+---------------------+
|  k|sum(v) FILTER (v > 3)|
+---+---------------------+
+---+---------------------+
```

### Why are the changes needed?

For better usability.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually.

Closes apache#27198 from maropu/SPARK-27986-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
GulajavaMinistudio pushed a commit that referenced this pull request Dec 20, 2020
…ries if AQE is enabled

### What changes were proposed in this pull request?

This PR fixes an issue that when AQE is enabled, EXPLAIN FORMATTED doesn't show the plan for subqueries.

```scala
val df = spark.range(1, 100)
df.createTempView("df")
spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("FORMATTED")

== Physical Plan ==
AdaptiveSparkPlan (3)
+- Project (2)
 +- Scan OneRowRelation (1)

(1) Scan OneRowRelation
Output: []
Arguments: ParallelCollectionRDD[0] at explain at <console>:24, OneRowRelation, UnknownPartitioning(0)

(2) Project
Output [1]: [Subquery subquery#3, [id=#20] AS scalarsubquery()#5L]
Input: []

(3) AdaptiveSparkPlan
Output [1]: [scalarsubquery()#5L]
Arguments: isFinalPlan=false
```

After this change, the plan for the subquerie is shown.
```scala
== Physical Plan ==
* Project (2)
+- * Scan OneRowRelation (1)

(1) Scan OneRowRelation [codegen id : 1]
Output: []
Arguments: ParallelCollectionRDD[0] at explain at <console>:24, OneRowRelation, UnknownPartitioning(0)

(2) Project [codegen id : 1]
Output [1]: [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L]
Input: []

===== Subqueries =====

Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery scalar-subquery#3, [id=#24]
* HashAggregate (6)
+- Exchange (5)
   +- * HashAggregate (4)
      +- * Range (3)

(3) Range [codegen id : 1]
Output [1]: [id#0L]
Arguments: Range (1, 100, step=1, splits=Some(12))

(4) HashAggregate [codegen id : 1]
Input [1]: [id#0L]
Keys: []
Functions [1]: [partial_min(id#0L)]
Aggregate Attributes [1]: [min#7L]
Results [1]: [min#8L]

(5) Exchange
Input [1]: [min#8L]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#20]

(6) HashAggregate [codegen id : 2]
Input [1]: [min#8L]
Keys: []
Functions [1]: [min(id#0L)]
Aggregate Attributes [1]: [min(id#0L)#4L]
Results [1]: [min(id#0L)#4L AS v#2L]
```

### Why are the changes needed?

For better debuggability.

### Does this PR introduce _any_ user-facing change?

Yes. Users can see the formatted plan for subqueries.

### How was this patch tested?

New test.

Closes apache#30855 from sarutak/fix-aqe-explain.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
GulajavaMinistudio pushed a commit that referenced this pull request Dec 22, 2020
…subquery code

### What changes were proposed in this pull request?

This PR fixes an issue that `EXPLAIN CODEGEN` and `BenchmarkQueryTest` don't show the corresponding code for subqueries.

The following example is about `EXPLAIN CODEGEN`.
```
spark.conf.set("spark.sql.adaptive.enabled", "false")
val df = spark.range(1, 100)
df.createTempView("df")
spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("CODEGEN")

scala> spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("CODEGEN")
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:55; maxConstantPoolSize:97(0.15% used); numInnerClasses:0) ==
*(1) Project [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L]
:  +- Subquery scalar-subquery#3, [id=#24]
:     +- *(2) HashAggregate(keys=[], functions=[min(id#0L)], output=[v#2L])
:        +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#20]
:           +- *(1) HashAggregate(keys=[], functions=[partial_min(id#0L)], output=[min#8L])
:              +- *(1) Range (1, 100, step=1, splits=12)
+- *(1) Scan OneRowRelation[]

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator rdd_input_0;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 011 */
/* 012 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */     this.references = references;
/* 014 */   }
/* 015 */
/* 016 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 017 */     partitionIndex = index;
/* 018 */     this.inputs = inputs;
/* 019 */     rdd_input_0 = inputs[0];
/* 020 */     project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 021 */
/* 022 */   }
/* 023 */
/* 024 */   private void project_doConsume_0() throws java.io.IOException {
/* 025 */     // common sub-expressions
/* 026 */
/* 027 */     project_mutableStateArray_0[0].reset();
/* 028 */
/* 029 */     if (false) {
/* 030 */       project_mutableStateArray_0[0].setNullAt(0);
/* 031 */     } else {
/* 032 */       project_mutableStateArray_0[0].write(0, 1L);
/* 033 */     }
/* 034 */     append((project_mutableStateArray_0[0].getRow()));
/* 035 */
/* 036 */   }
/* 037 */
/* 038 */   protected void processNext() throws java.io.IOException {
/* 039 */     while ( rdd_input_0.hasNext()) {
/* 040 */       InternalRow rdd_row_0 = (InternalRow) rdd_input_0.next();
/* 041 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 042 */       project_doConsume_0();
/* 043 */       if (shouldStop()) return;
/* 044 */     }
/* 045 */   }
/* 046 */
/* 047 */ }
```

After this change, the corresponding code for subqueries are shown.
```
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 (maxMethodCodeSize:282; maxConstantPoolSize:206(0.31% used); numInnerClasses:0) ==
*(1) HashAggregate(keys=[], functions=[partial_min(id#0L)], output=[min#8L])
+- *(1) Range (1, 100, step=1, splits=12)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg_0;
/* 010 */   private boolean agg_bufIsNull_0;
/* 011 */   private long agg_bufValue_0;
/* 012 */   private boolean range_initRange_0;
/* 013 */   private long range_nextIndex_0;
/* 014 */   private TaskContext range_taskContext_0;
/* 015 */   private InputMetrics range_inputMetrics_0;
/* 016 */   private long range_batchEnd_0;
/* 017 */   private long range_numElementsTodo_0;
/* 018 */   private boolean agg_agg_isNull_2_0;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
/* 020 */
/* 021 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 022 */     this.references = references;
/* 023 */   }
/* 024 */
/* 025 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 026 */     partitionIndex = index;
/* 027 */     this.inputs = inputs;
/* 028 */
/* 029 */     range_taskContext_0 = TaskContext.get();
/* 030 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 031 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 032 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 033 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 034 */
/* 035 */   }
/* 036 */
/* 037 */   private void agg_doAggregateWithoutKey_0() throws java.io.IOException {
/* 038 */     // initialize aggregation buffer
/* 039 */     agg_bufIsNull_0 = true;
/* 040 */     agg_bufValue_0 = -1L;
/* 041 */
/* 042 */     // initialize Range
/* 043 */     if (!range_initRange_0) {
/* 044 */       range_initRange_0 = true;
/* 045 */       initRange(partitionIndex);
/* 046 */     }
/* 047 */
/* 048 */     while (true) {
/* 049 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 050 */         long range_nextBatchTodo_0;
/* 051 */         if (range_numElementsTodo_0 > 1000L) {
/* 052 */           range_nextBatchTodo_0 = 1000L;
/* 053 */           range_numElementsTodo_0 -= 1000L;
/* 054 */         } else {
/* 055 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 056 */           range_numElementsTodo_0 = 0;
/* 057 */           if (range_nextBatchTodo_0 == 0) break;
/* 058 */         }
/* 059 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 060 */       }
/* 061 */
/* 062 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 063 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 064 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 065 */
/* 066 */         agg_doConsume_0(range_value_0);
/* 067 */
/* 068 */         // shouldStop check is eliminated
/* 069 */       }
/* 070 */       range_nextIndex_0 = range_batchEnd_0;
/* 071 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 072 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 073 */       range_taskContext_0.killTaskIfInterrupted();
/* 074 */     }
/* 075 */
/* 076 */   }
/* 077 */
/* 078 */   private void initRange(int idx) {
/* 079 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 080 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(12L);
/* 081 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(99L);
/* 082 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 083 */     java.math.BigInteger start = java.math.BigInteger.valueOf(1L);
/* 084 */     long partitionEnd;
/* 085 */
/* 086 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 087 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 088 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 089 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 090 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 091 */     } else {
/* 092 */       range_nextIndex_0 = st.longValue();
/* 093 */     }
/* 094 */     range_batchEnd_0 = range_nextIndex_0;
/* 095 */
/* 096 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 097 */     .multiply(step).add(start);
/* 098 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 099 */       partitionEnd = Long.MAX_VALUE;
/* 100 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 101 */       partitionEnd = Long.MIN_VALUE;
/* 102 */     } else {
/* 103 */       partitionEnd = end.longValue();
/* 104 */     }
/* 105 */
/* 106 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 107 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 108 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 109 */     if (range_numElementsTodo_0 < 0) {
/* 110 */       range_numElementsTodo_0 = 0;
/* 111 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 112 */       range_numElementsTodo_0++;
/* 113 */     }
/* 114 */   }
/* 115 */
/* 116 */   private void agg_doConsume_0(long agg_expr_0_0) throws java.io.IOException {
/* 117 */     // do aggregate
/* 118 */     // common sub-expressions
/* 119 */
/* 120 */     // evaluate aggregate functions and update aggregation buffers
/* 121 */
/* 122 */     agg_agg_isNull_2_0 = true;
/* 123 */     long agg_value_2 = -1L;
/* 124 */
/* 125 */     if (!agg_bufIsNull_0 && (agg_agg_isNull_2_0 ||
/* 126 */         agg_value_2 > agg_bufValue_0)) {
/* 127 */       agg_agg_isNull_2_0 = false;
/* 128 */       agg_value_2 = agg_bufValue_0;
/* 129 */     }
/* 130 */
/* 131 */     if (!false && (agg_agg_isNull_2_0 ||
/* 132 */         agg_value_2 > agg_expr_0_0)) {
/* 133 */       agg_agg_isNull_2_0 = false;
/* 134 */       agg_value_2 = agg_expr_0_0;
/* 135 */     }
/* 136 */
/* 137 */     agg_bufIsNull_0 = agg_agg_isNull_2_0;
/* 138 */     agg_bufValue_0 = agg_value_2;
/* 139 */
/* 140 */   }
/* 141 */
/* 142 */   protected void processNext() throws java.io.IOException {
/* 143 */     while (!agg_initAgg_0) {
/* 144 */       agg_initAgg_0 = true;
/* 145 */       long agg_beforeAgg_0 = System.nanoTime();
/* 146 */       agg_doAggregateWithoutKey_0();
/* 147 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000);
/* 148 */
/* 149 */       // output the result
/* 150 */
/* 151 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
/* 152 */       range_mutableStateArray_0[2].reset();
/* 153 */
/* 154 */       range_mutableStateArray_0[2].zeroOutNullBytes();
/* 155 */
/* 156 */       if (agg_bufIsNull_0) {
/* 157 */         range_mutableStateArray_0[2].setNullAt(0);
/* 158 */       } else {
/* 159 */         range_mutableStateArray_0[2].write(0, agg_bufValue_0);
/* 160 */       }
/* 161 */       append((range_mutableStateArray_0[2].getRow()));
/* 162 */     }
/* 163 */   }
/* 164 */
/* 165 */ }
```

### Why are the changes needed?

For better debuggability.

### Does this PR introduce _any_ user-facing change?

Yes. After this change, users can see subquery code by `EXPLAIN CODEGEN`.

### How was this patch tested?

New test.

Closes apache#30859 from sarutak/explain-codegen-subqueries.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
GulajavaMinistudio pushed a commit that referenced this pull request May 9, 2023
### What changes were proposed in this pull request?

AQE can not reuse subquery if it is pushed into `InMemoryTableScan`. There are two issues:
- `ReuseAdaptiveSubquery` can not support reuse subquery if two subquery have the same exprId
-  `InMemoryTableScan` miss apply `ReuseAdaptiveSubquery` when wrap `TableCacheQueryStageExec`

For example:
```
Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
Seq(2).toDF("c2").createOrReplaceTempView("t2")
spark.sql("SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
```
There are two `subquery#27` but have no `ReusedSubquery`

```
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) Filter (c1#14 < Subquery subquery#27, [id=#20])
   :  +- Subquery subquery#27, [id=#20]
   :     +- AdaptiveSparkPlan isFinalPlan=true
   :        +- LocalTableScan [c2#25]
   +- TableCacheQueryStage 0
      +- InMemoryTableScan [c1#14], [(c1#14 < Subquery subquery#27, [id=#20])]
            :- InMemoryRelation [c1#14], StorageLevel(disk, memory, deserialized, 1 replicas)
            :     +- LocalTableScan [c1#14]
            +- Subquery subquery#27, [id=#20]
               +- AdaptiveSparkPlan isFinalPlan=true
                  +- LocalTableScan [c2#25]
```

### Why are the changes needed?

Improve the coverage of reuse subquery.

Note that, it is not a real perf issue because the subquery has been already reused (the same Java object). This pr just makes the plan clearer about subquery reuse.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

add test

Closes apache#41046 from ulysses-you/aqe-subquery.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GulajavaMinistudio pushed a commit that referenced this pull request Aug 23, 2023
…te` for Java 21

### What changes were proposed in this pull request?

SPARK-44507(apache#42130) updated `try_arithmetic.sql.out` and `numeric.sql.out`, SPARK-44868(apache#42534) updated `datetime-formatting.sql.out`, but these PRs didn’t pay attention to the test health on Java 21. So this PR has regenerated the golden files `try_arithmetic.sql.out.java21`, `numeric.sql.out.java21`, and `datetime-formatting.sql.out.java21` of `SQLQueryTestSuite` so that `SQLQueryTestSuite` can be tested with Java 21.

### Why are the changes needed?
Restore `SQLQueryTestSuite` to be tested with Java 21.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Actions
- Manual checked:

```
java -version
openjdk version "21-ea" 2023-09-19
OpenJDK Runtime Environment Zulu21+69-CA (build 21-ea+28)
OpenJDK 64-Bit Server VM Zulu21+69-CA (build 21-ea+28, mixed mode, sharing)
```

```
SPARK_GENERATE_GOLDEN_FILES=0 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
```

**Before**

```
...
[info] - datetime-formatting.sql *** FAILED *** (316 milliseconds)
[info]   datetime-formatting.sql
[info]   Array("-- Automatically generated by SQLQueryTestSuite
[info]   ", "create temporary view v as select col from values
[info]    (timestamp '1582-06-01 11:33:33.123UTC+080000'),
[info]    (timestamp '1970-01-01 00:00:00.000Europe/Paris'),
[info]    (timestamp '1970-12-31 23:59:59.999Asia/Srednekolymsk'),
[info]    (timestamp '1996-04-01 00:33:33.123Australia/Darwin'),
[info]    (timestamp '2018-11-17 13:33:33.123Z'),
[info]    (timestamp '2020-01-01 01:33:33.123Asia/Shanghai'),
[info]    (timestamp '2100-01-01 01:33:33.123America/Los_Angeles') t(col)
[info]   ", "struct<>
[info]   ", "
[info]
[info]
[info]   ", "select col, date_format(col, 'G GG GGG GGGG') from v
[info]   ", "struct<col:timestamp,date_format(col, G GG GGG GGGG):string>
[info]   ", "1582-05-31 19:40:35.123	AD AD AD Anno Domini
[info]   1969-12-31 15:00:00	AD AD AD Anno Domini
[info]   1970-12-31 04:59:59.999	AD AD AD Anno Domini
[info]   1996-03-31 07:03:33.123	AD AD AD Anno Domini
[info]   2018-11-17 05:33:33.123	AD AD AD Anno Domini
[info]   2019-12-31 09:33:33.123	AD AD AD Anno Domini
[info]   2100-01-01 01:33:33.123	AD AD AD Anno Domini
[info]
[info]
[info]   ", "select col, date_format(col, 'y yy yyy yyyy yyyyy yyyyyy') from v
[info]   ", "struct<col:timestamp,date_format(col, y yy yyy yyyy yyyyy yyyyyy):string>
[info]   ", "1582-05-31 19:40:35.123	1582 82 1582 1582 01582 001582
[info]   1969-12-31 15:00:00	1969 69 1969 1969 01969 001969
[info]   1970-12-31 04:59:59.999	1970 70 1970 1970 01970 001970
[info]   1996-03-31 07:03:33.123	1996 96 1996 1996 01996 001996
[info]   2018-11-17 05:33:33.123	2018 18 2018 2018 02018 002018
[info]   2019-12-31 09:33:33.123	2019 19 2019 2019 02019 002019
[info]   2100-01-01 01:33:33.123	2100 00 2100 2100 02100 002100
[info]
...
[info] - postgreSQL/numeric.sql *** FAILED *** (35 seconds, 848 milliseconds)
[info]   postgreSQL/numeric.sql
[info]   Expected "...rg.apache.spark.sql.[]AnalysisException
[info]   {
[info]   ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException
[info]   {
[info]   ..." Result did not match for query #544
[info]   SELECT '' AS to_number_2,  to_number('-34,338,492.654,878', '99G999G999D999G999') (SQLQueryTestSuite.scala:876)
[info]   org.scalatest.exceptions.TestFailedException:
...
[info] - try_arithmetic.sql *** FAILED *** (314 milliseconds)
[info]   try_arithmetic.sql
[info]   Expected "...rg.apache.spark.sql.[]AnalysisException
[info]   {
[info]   ...", but got "...rg.apache.spark.sql.[catalyst.Extended]AnalysisException
[info]   {
[info]   ..." Result did not match for query #20
[info]   SELECT try_add(interval 2 year, interval 2 second) (SQLQueryTestSuite.scala:876)
[info]   org.scalatest.exceptions.TestFailedException:
```

**After**
```
[info] Run completed in 9 minutes, 10 seconds.
[info] Total number of tests run: 572
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 572, failed 0, canceled 0, ignored 59, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#42580 from LuciferYang/SPARK-44888.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
GulajavaMinistudio pushed a commit that referenced this pull request Apr 4, 2024
### What changes were proposed in this pull request?

This PR aims to enforce to install `six` to Python 3.10 because `Python 3.10` is missing `six` and causes `Pandas` detection failures in CIs.
- https://github.com/apache/spark/actions/runs/8525063765/job/23373974516
   - Note that `pandas` is visible in the installed package list, but it fails when PySpark detects it due to the missing `six`.

```
$ docker run -it --rm ghcr.io/apache/apache-spark-ci-image:master-8345361470 python3.9 -m pip freeze | grep six
six==1.16.0
$ docker run -it --rm ghcr.io/apache/apache-spark-ci-image:master-8345361470 python3.10 -m pip freeze
| grep six
$ docker run -it --rm ghcr.io/apache/apache-spark-ci-image:master-8345361470 python3.11 -m pip freeze | grep six
six==1.16.0
$ docker run -it --rm ghcr.io/apache/apache-spark-ci-image:master-8345361470 python3.12 -m pip freeze | grep six
six==1.16.0
```

- CI failure message example.
  - https://github.com/apache/spark/actions/runs/8525063765/job/23373974096
```
Starting test(python3.10): pyspark.ml.tests.connect.test_connect_classification (temp output: /__w/spark/spark/python/target/370eb2c4-12f2-411f-96d1-f617f5d59528/python3.10__pyspark.ml.tests.connect.test_connect_classification__v6itdsxy.log)
Traceback (most recent call last):
  File "/usr/lib/python3.10/runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.10/runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "/__w/spark/spark/python/pyspark/ml/tests/connect/test_connect_classification.py", line 37, in <module>
    class ClassificationTestsOnConnect(ClassificationTestsMixin, unittest.TestCase):
NameError: name 'ClassificationTestsMixin' is not defined
```

### Why are the changes needed?

Since Python 3.10 is the default Python version of Ubuntu OS, the behavior is different.
```
RUN python3.10 -m pip install numpy pyarrow>=15.0.0 six==1.16.0 ...
...
#20 0.766 Requirement already satisfied: six==1.16.0 in /usr/lib/python3/dist-packages (1.16.0)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Check the docker image built by this PR.
- https://github.com/dongjoon-hyun/spark/actions/runs/8533625657/job/23376659246

```
$ docker pull --platform amd64 ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-8533625657

$ docker run -it --rm ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-8533625657 python3.10 -m pip freeze | grep six
six==1.16.0
```

Run tests on new docker image.
```
$ docker run -it --rm -v $PWD:/spark ghcr.io/dongjoon-hyun/apache-spark-ci-image:master-8533625657
rootb7f5f56892b0:/# cd /spark
rootb7f5f56892b0:/spark# python/run-tests --modules=pyspark-mllib,pyspark-ml,pyspark-ml-connect --parallelism=1 --python-executables=python3.10
Running PySpark tests. Output is in /spark/python/unit-tests.log
Will test against the following Python executables: ['python3.10']
Will test the following Python modules: ['pyspark-mllib', 'pyspark-ml', 'pyspark-ml-connect']
python3.10 python_implementation is CPython
python3.10 version is: Python 3.10.12
Starting test(python3.10): pyspark.ml.tests.connect.test_connect_classification (temp output: /spark/python/target/675eccdc-3c4b-4146-a58b-030302bdc6d7/python3.10__pyspark.ml.tests.connect.test_connect_classification__9habp0rh.log)
Finished test(python3.10): pyspark.ml.tests.connect.test_connect_classification (159s)
Starting test(python3.10): pyspark.ml.tests.connect.test_connect_evaluation (temp output: /spark/python/target/fbac93ba-c72d-40e4-acfe-f3ac01b4932a/python3.10__pyspark.ml.tests.connect.test_connect_evaluation__js11z0ux.log)
Finished test(python3.10): pyspark.ml.tests.connect.test_connect_evaluation (36s)
Starting test(python3.10): pyspark.ml.tests.connect.test_connect_feature (temp output: /spark/python/target/fdb8828e-4241-4e78-a7d6-b2a4beb3cfc1/python3.10__pyspark.ml.tests.connect.test_connect_feature__et5gr30f.log)
Finished test(python3.10): pyspark.ml.tests.connect.test_connect_feature (30s)
Starting test(python3.10): pyspark.ml.tests.connect.test_connect_function (temp output: /spark/python/target/e365e62f-a09b-483d-9101-fe9dfc0801f2/python3.10__pyspark.ml.tests.connect.test_connect_function__5e288azs.log)
Finished test(python3.10): pyspark.ml.tests.connect.test_connect_function (24s)
Starting test(python3.10): pyspark.ml.tests.connect.test_connect_pipeline (temp output: /spark/python/target/bdc167be-6d6e-4704-b840-cf5d23c4b21e/python3.10__pyspark.ml.tests.connect.test_connect_pipeline__63blw3o2.log)
...
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45832 from dongjoon-hyun/SPARK-47452-2.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants