Skip to content

[SPARK-32914][SQL] Avoid constructing dataType multiple times#29790

Closed
wangyum wants to merge 5 commits intoapache:masterfrom
wangyum:SPARK-32914
Closed

[SPARK-32914][SQL] Avoid constructing dataType multiple times#29790
wangyum wants to merge 5 commits intoapache:masterfrom
wangyum:SPARK-32914

Conversation

@wangyum
Copy link
Copy Markdown
Member

@wangyum wangyum commented Sep 17, 2020

What changes were proposed in this pull request?

Some expression's data type not a static value. It needs to be constructed a new object when calling dataType function. E.g.: CaseWhen.
We should avoid constructing dataType multiple times because it may be used many times. E.g.: HyperLogLogPlusPlus.update.

Why are the changes needed?

Improve query performance. for example:

spark.range(100000000L).selectExpr("approx_count_distinct(case when id % 400 > 20 then id else 0 end)").show

Profiling result:

-- Execution profile ---
Total samples       : 18365

Frame buffer usage  : 2.6688%

--- 58443254327 ns (31.82%), 5844 samples
  [ 0] GenericTaskQueueSet<OverflowTaskQueue<StarTask, (MemoryType)1, 131072u>, (MemoryType)1>::steal_best_of_2(unsigned int, int*, StarTask&)
  [ 1] StealTask::do_it(GCTaskManager*, unsigned int)
  [ 2] GCTaskThread::run()
  [ 3] java_start(Thread*)
  [ 4] start_thread

--- 6140668667 ns (3.34%), 614 samples
  [ 0] GenericTaskQueueSet<OverflowTaskQueue<StarTask, (MemoryType)1, 131072u>, (MemoryType)1>::peek()
  [ 1] ParallelTaskTerminator::offer_termination(TerminatorTerminator*)
  [ 2] StealTask::do_it(GCTaskManager*, unsigned int)
  [ 3] GCTaskThread::run()
  [ 4] java_start(Thread*)
  [ 5] start_thread

--- 5679994036 ns (3.09%), 568 samples
  [ 0] scala.collection.generic.Growable.$plus$plus$eq
  [ 1] scala.collection.generic.Growable.$plus$plus$eq$
  [ 2] scala.collection.mutable.ListBuffer.$plus$plus$eq
  [ 3] scala.collection.mutable.ListBuffer.$plus$plus$eq
  [ 4] scala.collection.generic.GenericTraversableTemplate.$anonfun$flatten$1
  [ 5] scala.collection.generic.GenericTraversableTemplate$$Lambda$107.411506101.apply
  [ 6] scala.collection.immutable.List.foreach
  [ 7] scala.collection.generic.GenericTraversableTemplate.flatten
  [ 8] scala.collection.generic.GenericTraversableTemplate.flatten$
  [ 9] scala.collection.AbstractTraversable.flatten
  [10] org.apache.spark.internal.config.ConfigEntry.readString
  [11] org.apache.spark.internal.config.ConfigEntryWithDefault.readFrom
  [12] org.apache.spark.sql.internal.SQLConf.getConf
  [13] org.apache.spark.sql.internal.SQLConf.caseSensitiveAnalysis
  [14] org.apache.spark.sql.types.DataType.sameType
  [15] org.apache.spark.sql.catalyst.analysis.TypeCoercion$.$anonfun$haveSameType$1
  [16] org.apache.spark.sql.catalyst.analysis.TypeCoercion$.$anonfun$haveSameType$1$adapted
  [17] org.apache.spark.sql.catalyst.analysis.TypeCoercion$$$Lambda$1527.1975399904.apply
  [18] scala.collection.IndexedSeqOptimized.prefixLengthImpl
  [19] scala.collection.IndexedSeqOptimized.forall
  [20] scala.collection.IndexedSeqOptimized.forall$
  [21] scala.collection.mutable.ArrayBuffer.forall
  [22] org.apache.spark.sql.catalyst.analysis.TypeCoercion$.haveSameType
  [23] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck
  [24] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataTypeCheck$
  [25] org.apache.spark.sql.catalyst.expressions.CaseWhen.dataTypeCheck
  [26] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType
  [27] org.apache.spark.sql.catalyst.expressions.ComplexTypeMergingExpression.dataType$
  [28] org.apache.spark.sql.catalyst.expressions.CaseWhen.dataType
  [29] org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus.update
  [30] org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$2
  [31] org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$2$adapted
  [32] org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1$$Lambda$1534.1383512673.apply
  [33] org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7
  [34] org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted
  [35] org.apache.spark.sql.execution.aggregate.AggregationIterator$$Lambda$1555.725788712.apply

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manual test and benchmark test:

Benchmark code Before this PR(Milliseconds) After this PR(Milliseconds)
spark.range(100000000L).selectExpr("approx_count_distinct(case when id % 400 > 20 then id else 0 end)").collect() 56462 3794

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 17, 2020

Test build #128834 has finished for PR 29790 at commit 906d2e0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 17, 2020

Test build #128836 has finished for PR 29790 at commit 6a9c01f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Copy Markdown
Contributor

do we really need such an invasive change? If there is a specific expression that calls dataType many times, let's fix that expression only. Or if this can bring significant end-to-end perf speedup, we can consider accepting it.

@cloud-fan
Copy link
Copy Markdown
Contributor

Or if an expression has a very complicated def dataType, can we change it to lazy val dataType?

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 18, 2020

Test build #128864 has finished for PR 29790 at commit f5f3af5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 22, 2020

Test build #128957 has finished for PR 29790 at commit 5755273.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Copy Markdown
Member Author

wangyum commented Sep 22, 2020

retest this please

extends SubqueryExpression(plan, children, exprId) with Unevaluable {
override def dataType: DataType = {

private lazy val internalDataType: DataType = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be a lazy val? seems a very cheap method.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this change because I did not find an expression to call this method many times.

@transient private lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType]

override def dataType: DataType = {
private lazy val internalDataType: DataType = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it expensive? it just creates a few objects.

Copy link
Copy Markdown
Member Author

@wangyum wangyum Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to improve this case:
image

Benchmark code Before this PR(Milliseconds) After this PR(Milliseconds)
spark.range(100000000L).selectExpr("approx_count_distinct(map_entries(map(1, id)))").collect() 21787 15551

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 22, 2020

Test build #128970 has finished for PR 29790 at commit 5755273.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 22, 2020

Test build #128983 has finished for PR 29790 at commit 649d3c2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 22, 2020

Test build #128985 has finished for PR 29790 at commit f2dc664.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum changed the title [SPARK-32914][SQL] Avoid calling dataType multiple times for each expression [SPARK-32914][SQL] Avoid constructing dataType multiple times Sep 28, 2020
@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 28, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33789/

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 28, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33789/

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 28, 2020

Test build #129174 has finished for PR 29790 at commit 6a8877d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@transient
lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType)

private lazy val internalDataType: DataType = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put it right before the line of override def dataType: DataType?

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 29, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33855/

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 29, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33855/

@SparkQA
Copy link
Copy Markdown

SparkQA commented Sep 29, 2020

Test build #129238 has finished for PR 29790 at commit 37d0786.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Copy Markdown
Member

Merged to master.

@wangyum wangyum deleted the SPARK-32914 branch October 6, 2020 04:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants