Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
* Interface for a hashed relation by some key. Use [[HashedRelation.apply]] to create a concrete
* object.
*/
private[joins] sealed trait HashedRelation {
private[execution] sealed trait HashedRelation {
def get(key: InternalRow): Seq[InternalRow]

// This is a helper method to implement Externalizable, and is used by
Expand Down Expand Up @@ -111,7 +111,7 @@ final class UniqueKeyHashedRelation(private var hashTable: JavaHashMap[InternalR
// TODO(rxin): a version of [[HashedRelation]] backed by arrays for consecutive integer keys.


private[joins] object HashedRelation {
private[execution] object HashedRelation {

def apply(
input: Iterator[InternalRow],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection, Projection}

case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {

override def output: Seq[Attribute] = child.output

private[this] var convertToSafe: Projection = _

override def open(): Unit = {
child.open()
convertToSafe = FromUnsafeProjection(child.schema)
}

override def next(): Boolean = child.next()

override def fetch(): InternalRow = convertToSafe(child.fetch())

override def close(): Unit = child.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Projection, UnsafeProjection}

case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {

override def output: Seq[Attribute] = child.output

private[this] var convertToUnsafe: Projection = _

override def open(): Unit = {
child.open()
convertToUnsafe = UnsafeProjection.create(child.schema)
}

override def next(): Boolean = child.next()

override def fetch(): InternalRow = convertToUnsafe(child.fetch())

override def close(): Unit = child.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate


case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode {
case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
extends UnaryLocalNode(conf) {

private[this] var predicate: (InternalRow) => Boolean = _

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* Much of this code is similar to [[org.apache.spark.sql.execution.joins.HashJoin]].
*/
case class HashJoinNode(
conf: SQLConf,
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
buildSide: BuildSide,
left: LocalNode,
right: LocalNode) extends BinaryLocalNode(conf) {

private[this] lazy val (buildNode, buildKeys, streamedNode, streamedKeys) = buildSide match {
case BuildLeft => (left, leftKeys, right, rightKeys)
case BuildRight => (right, rightKeys, left, leftKeys)
}

private[this] var currentStreamedRow: InternalRow = _
private[this] var currentHashMatches: Seq[InternalRow] = _
private[this] var currentMatchPosition: Int = -1

private[this] var joinRow: JoinedRow = _
private[this] var resultProjection: (InternalRow) => InternalRow = _

private[this] var hashed: HashedRelation = _
private[this] var joinKeys: Projection = _

override def output: Seq[Attribute] = left.output ++ right.output

private[this] def isUnsafeMode: Boolean = {
(codegenEnabled && unsafeEnabled
&& UnsafeProjection.canSupport(buildKeys)
&& UnsafeProjection.canSupport(schema))
}

private[this] def buildSideKeyGenerator: Projection = {
if (isUnsafeMode) {
UnsafeProjection.create(buildKeys, buildNode.output)
} else {
newMutableProjection(buildKeys, buildNode.output)()
}
}

private[this] def streamSideKeyGenerator: Projection = {
if (isUnsafeMode) {
UnsafeProjection.create(streamedKeys, streamedNode.output)
} else {
newMutableProjection(streamedKeys, streamedNode.output)()
}
}

override def open(): Unit = {
buildNode.open()
hashed = HashedRelation.apply(
new LocalNodeIterator(buildNode), SQLMetrics.nullLongMetric, buildSideKeyGenerator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I would prefer for LocalNodeIterator to be hidden outside LocalNode so the separation is cleaner. I'll submit a follow-up patch to do this.

streamedNode.open()
joinRow = new JoinedRow
resultProjection = {
if (isUnsafeMode) {
UnsafeProjection.create(schema)
} else {
identity[InternalRow]
}
}
joinKeys = streamSideKeyGenerator
}

override def next(): Boolean = {
currentMatchPosition += 1
if (currentHashMatches == null || currentMatchPosition >= currentHashMatches.size) {
fetchNextMatch()
} else {
true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the following is functionally equivalent and easier to read:

currentMatchPosition += 1
if (currentHashMatches == null || currentMatchPosition >= currentHashMatches.size) {
  fetchNextMatch()
} else {
  true
}

which says if we don't currently have matches, or we've already joined all of our existing matches, then fetch more matches.

}

/**
* Populate `currentHashMatches` with build-side rows matching the next streamed row.
* @return whether matches are found such that subsequent calls to `fetch` are valid.
*/
private def fetchNextMatch(): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some java docs here:

/**
 * Populate `currentHashMatches` with build-side rows matching the next streamed row.
 * @return whether matches are found such that subsequent calls to `fetch` are valid.
 */

currentHashMatches = null
currentMatchPosition = -1

while (currentHashMatches == null && streamedNode.next()) {
currentStreamedRow = streamedNode.fetch()
val key = joinKeys(currentStreamedRow)
if (!key.anyNull) {
currentHashMatches = hashed.get(key)
}
}

if (currentHashMatches == null) {
false
} else {
currentMatchPosition = 0
true
}
}

override def fetch(): InternalRow = {
val ret = buildSide match {
case BuildRight => joinRow(currentStreamedRow, currentHashMatches(currentMatchPosition))
case BuildLeft => joinRow(currentHashMatches(currentMatchPosition), currentStreamedRow)
}
resultProjection(ret)
}

override def close(): Unit = {
left.close()
right.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute


case class LimitNode(limit: Int, child: LocalNode) extends UnaryLocalNode {
case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) {

private[this] var count = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.Row
import scala.util.control.NonFatal

import org.apache.spark.Logging
import org.apache.spark.sql.{SQLConf, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.StructType

Expand All @@ -29,7 +33,15 @@ import org.apache.spark.sql.types.StructType
* Before consuming the iterator, open function must be called.
* After consuming the iterator, close function must be called.
*/
abstract class LocalNode extends TreeNode[LocalNode] {
abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging {

protected val codegenEnabled: Boolean = conf.codegenEnabled

protected val unsafeEnabled: Boolean = conf.unsafeEnabled

lazy val schema: StructType = StructType.fromAttributes(output)

private[this] lazy val isTesting: Boolean = sys.props.contains("spark.testing")

def output: Seq[Attribute]

Expand Down Expand Up @@ -73,17 +85,78 @@ abstract class LocalNode extends TreeNode[LocalNode] {
}
result
}

protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: why not just logError? same in L90 (just wondering)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. I saw many places in SQL uses them so I just followed the style.

() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}

}


abstract class LeafLocalNode extends LocalNode {
abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) {
override def children: Seq[LocalNode] = Seq.empty
}


abstract class UnaryLocalNode extends LocalNode {
abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) {

def child: LocalNode

override def children: Seq[LocalNode] = Seq(child)
}

abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) {

def left: LocalNode

def right: LocalNode

override def children: Seq[LocalNode] = Seq(left, right)
}

/**
* An thin wrapper around a [[LocalNode]] that provides an `Iterator` interface.
*/
private[local] class LocalNodeIterator(localNode: LocalNode) extends Iterator[InternalRow] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add tests for this :) Let's do it in a follow-up patch.

private var nextRow: InternalRow = _

override def hasNext: Boolean = {
if (nextRow == null) {
val res = localNode.next()
if (res) {
nextRow = localNode.fetch()
}
res
} else {
true
}
}

override def next(): InternalRow = {
if (hasNext) {
val res = nextRow
nextRow = null
res
} else {
throw new NoSuchElementException
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, NamedExpression}


case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode {
case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
extends UnaryLocalNode(conf) {

private[this] var project: UnsafeProjection = _

Expand Down
Loading