Skip to content

Conversation

@ashangit
Copy link

@ashangit ashangit commented Mar 1, 2017

Reverts #27

Tests are currently failling.
As we need to have this branch working for other MR we should revert it

@AnthonyTruchet AnthonyTruchet merged commit e8fd5d8 into criteo-1.6 Mar 1, 2017
superbobry pushed a commit that referenced this pull request Apr 10, 2017
…boxing/unboxing

## What changes were proposed in this pull request?

This PR improve performance of Dataset.map() for primitive types by removing boxing/unbox operations. This is based on [the discussion](apache#16391 (comment)) with cloud-fan.

Current Catalyst generates a method call to a `apply()` method of an anonymous function written in Scala. The types of an argument and return value are `java.lang.Object`. As a result, each method call for a primitive value involves a pair of unboxing and boxing for calling this `apply()` method and a pair of boxing and unboxing for returning from this `apply()` method.

This PR directly calls a specialized version of a `apply()` method without boxing and unboxing. For example, if types of an arguments ant return value is `int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any combination of `Int`, `Long`, `Float`, and `Double`.

The following is a benchmark result using [this program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a Dataset part of this program.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
RDD                                           1923 / 1952         52.0          19.2       1.0X
DataFrame                                      526 /  548        190.2           5.3       3.7X
Dataset                                       3094 / 3154         32.3          30.9       0.6X
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
RDD                                           1883 / 1892         53.1          18.8       1.0X
DataFrame                                      502 /  642        199.1           5.0       3.7X
Dataset                                        657 /  784        152.2           6.6       2.9X
```

```java
  def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = {
    import spark.implicits._
    val rdd = spark.sparkContext.range(0, numRows)
    val ds = spark.range(0, numRows)
    val func = (l: Long) => l + 1
    val benchmark = new Benchmark("back-to-back map", numRows)
...
    benchmark.addCase("Dataset") { iter =>
      var res = ds.as[Long]
      var i = 0
      while (i < numChains) {
        res = res.map(func)
        i += 1
      }
      res.queryExecution.toRdd.foreach(_ => Unit)
    }
    benchmark
  }
```

A motivating example
```java
Seq(1, 2, 3).toDS.map(i => i * 7).show
```

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow deserializetoobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 012 */   private int mapelements_argValue;
/* 013 */   private UnsafeRow mapelements_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 016 */   private UnsafeRow serializefromobject_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     inputadapter_input = inputs[0];
/* 028 */     deserializetoobject_result = new UnsafeRow(1);
/* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
/* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 031 */
/* 032 */     mapelements_result = new UnsafeRow(1);
/* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
/* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 035 */     serializefromobject_result = new UnsafeRow(1);
/* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   protected void processNext() throws java.io.IOException {
/* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */       boolean mapelements_isNull = true;
/* 047 */       int mapelements_value = -1;
/* 048 */       if (!false) {
/* 049 */         mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */         mapelements_isNull = false;
/* 052 */         if (!mapelements_isNull) {
/* 053 */           Object mapelements_funcResult = null;
/* 054 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
/* 055 */           if (mapelements_funcResult == null) {
/* 056 */             mapelements_isNull = true;
/* 057 */           } else {
/* 058 */             mapelements_value = (Integer) mapelements_funcResult;
/* 059 */           }
/* 060 */
/* 061 */         }
/* 062 */
/* 063 */       }
/* 064 */
/* 065 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 066 */
/* 067 */       if (mapelements_isNull) {
/* 068 */         serializefromobject_rowWriter.setNullAt(0);
/* 069 */       } else {
/* 070 */         serializefromobject_rowWriter.write(0, mapelements_value);
/* 071 */       }
/* 072 */       append(serializefromobject_result);
/* 073 */       if (shouldStop()) return;
/* 074 */     }
/* 075 */   }
/* 076 */ }
```

Generated code with this PR (lines 48-56 are changed)
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow deserializetoobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 012 */   private int mapelements_argValue;
/* 013 */   private UnsafeRow mapelements_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 016 */   private UnsafeRow serializefromobject_result;
/* 017 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 019 */
/* 020 */   public GeneratedIterator(Object[] references) {
/* 021 */     this.references = references;
/* 022 */   }
/* 023 */
/* 024 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 025 */     partitionIndex = index;
/* 026 */     this.inputs = inputs;
/* 027 */     inputadapter_input = inputs[0];
/* 028 */     deserializetoobject_result = new UnsafeRow(1);
/* 029 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 0);
/* 030 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 031 */
/* 032 */     mapelements_result = new UnsafeRow(1);
/* 033 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 0);
/* 034 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 035 */     serializefromobject_result = new UnsafeRow(1);
/* 036 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 037 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 038 */
/* 039 */   }
/* 040 */
/* 041 */   protected void processNext() throws java.io.IOException {
/* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */       boolean mapelements_isNull = true;
/* 047 */       int mapelements_value = -1;
/* 048 */       if (!false) {
/* 049 */         mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */         mapelements_isNull = false;
/* 052 */         if (!mapelements_isNull) {
/* 053 */           mapelements_value = ((scala.Function1) references[0]).apply$mcII$sp(mapelements_argValue);
/* 054 */         }
/* 055 */
/* 056 */       }
/* 057 */
/* 058 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 059 */
/* 060 */       if (mapelements_isNull) {
/* 061 */         serializefromobject_rowWriter.setNullAt(0);
/* 062 */       } else {
/* 063 */         serializefromobject_rowWriter.write(0, mapelements_value);
/* 064 */       }
/* 065 */       append(serializefromobject_result);
/* 066 */       if (shouldStop()) return;
/* 067 */     }
/* 068 */   }
/* 069 */ }
```

Java bytecode for methods for `i => i * 7`
```java
$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
  public static final long serialVersionUID;

  public final int apply(int);
    Code:
       0: aload_0
       1: iload_1
       2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
       5: ireturn

  public int apply$mcII$sp(int);
    Code:
       0: iload_1
       1: bipush        7
       3: imul
       4: ireturn

  public final java.lang.Object apply(java.lang.Object);
    Code:
       0: aload_0
       1: aload_1
       2: invokestatic  #29                 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
       5: invokevirtual #31                 // Method apply:(I)I
       8: invokestatic  #35                 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
      11: areturn

  public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
    Code:
       0: aload_0
       1: invokespecial #42                 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
       4: return
}
```
## How was this patch tested?

Added new test suites to `DatasetPrimitiveSuite`.

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#17172 from kiszk/SPARK-19008.
@Willymontaz Willymontaz deleted the revert-27-spark-history-ui-backport branch April 2, 2019 15:07
jetoile pushed a commit that referenced this pull request Mar 13, 2024
…mand

### What changes were proposed in this pull request?

This PR proposes to sort table properties in DESCRIBE TABLE command. This is consistent with DSv2 command as well:
https://github.com/apache/spark/blob/e3058ba17cb4512537953eb4ded884e24ee93ba2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala#L63

This PR fixes the test case in Scala 2.13 build as well where the table properties have different order in the map.

### Why are the changes needed?

To keep the deterministic and pretty output, and fix the tests in Scala 2.13 build.
See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2-scala-2.13/49/testReport/junit/org.apache.spark.sql/SQLQueryTestSuite/describe_sql/

```
describe.sql&#10;Expected "...spark_catalog, view.[query.out.col.2=c, view.referredTempFunctionsNames=[], view.catalogAndNamespace.part.1=default]]", but got "...spark_catalog, view.[catalogAndNamespace.part.1=default, view.query.out.col.2=c, view.referredTempFunctionsNames=[]]]" Result did not match for query #29&#10;DESC FORMATTED v
```

### Does this PR introduce _any_ user-facing change?

Yes, it will change the text output from `DESCRIBE [EXTENDED|FORMATTED] table_name`.
Now the table properties are sorted by its key.

### How was this patch tested?

Related unittests were fixed accordingly.

Closes apache#30799 from HyukjinKwon/SPARK-33803.

Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 7845865)
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants