Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.catalyst.plans.logical

import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CatalystConf
Expand Down Expand Up @@ -301,6 +304,11 @@ abstract class LeafNode extends LogicalPlan {
override def producedAttributes: AttributeSet = outputSet
}

object UnaryNode {
private[spark] lazy val taskSupport =
new ForkJoinTaskSupport(new ForkJoinPool(8))
}

/**
* A logical plan node with single child.
*/
Expand All @@ -314,19 +322,29 @@ abstract class UnaryNode extends LogicalPlan {
* expressions with the corresponding alias
*/
protected def getAliasedConstraints(projectList: Seq[NamedExpression]): Set[Expression] = {
var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
projectList.foreach {
case a @ Alias(e, _) =>
// For every alias in `projectList`, replace the reference in constraints by its attribute.
allConstraints ++= allConstraints.map(_ transform {
case expr: Expression if expr.semanticEquals(e) =>
a.toAttribute
})
allConstraints += EqualNullSafe(e, a.toAttribute)
case _ => // Don't change.
}

allConstraints -- child.constraints
val relativeReferences = AttributeSet(projectList.collect {
case a: Alias => a
}.flatMap(_.references))
val parAllConstraints = child.constraints.asInstanceOf[Set[Expression]].filter { constraint =>
constraint.references.intersect(relativeReferences).nonEmpty
}.par
parAllConstraints.tasksupport = UnaryNode.taskSupport

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using a custom task support instead of the default (which uses the global fork-join executor)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do they have the same parallelism level? BTW, I saw the parallel collection used in other places in Spark all take custom task support.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether they do or not depends on the implementation of the default task support. But even if they use the same level of parallelism, they're distinct executors. Which means they won't share a common thread pool or task queue. I don't know why Spark would use custom task support in other places. It may be to avoid engaging all of the CPU cores on the host machine. But then it seems more efficient for Spark to have its own global task support.


parAllConstraints.flatMap { constraint =>
var partConstraints = Set(constraint)
projectList.foreach {
case a @ Alias(e, _) =>
// For every alias in `projectList`, replace the reference in constraints
// by its attribute.
partConstraints ++= partConstraints.map(_ transform {
case expr: Expression if expr.semanticEquals(e) =>
a.toAttribute
})
partConstraints += EqualNullSafe(e, a.toAttribute)
case _ => // Don't change.
}
partConstraints
}.seq -- child.constraints
}

override protected def validConstraints: Set[Expression] = child.constraints
Expand Down