Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, FromUnsafeProjection, Projection}

case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {

override def output: Seq[Attribute] = child.output

override def outputsUnsafeRows: Boolean = false

override def canProcessUnsafeRows: Boolean = true

override def canProcessSafeRows: Boolean = false

private[this] var convertToSafe: Projection = _

override def open(): Unit = {
child.open()
convertToSafe = FromUnsafeProjection(child.output.map(_.dataType))
}

override def next(): Boolean = child.next()

override def fetch(): InternalRow = convertToSafe(child.fetch())

override def close(): Unit = child.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Projection, UnsafeProjection}

case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf) {

override def output: Seq[Attribute] = child.output

override def outputsUnsafeRows: Boolean = true

override def canProcessUnsafeRows: Boolean = false

override def canProcessSafeRows: Boolean = true

private[this] var convertToUnsafe: Projection = _

override def open(): Unit = {
child.open()
convertToUnsafe = UnsafeProjection.create(child.schema)
}

override def next(): Boolean = child.next()

override def fetch(): InternalRow = convertToUnsafe(child.fetch())

override def close(): Unit = child.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Projection}

case class ExpandNode(
conf: SQLConf,
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: LocalNode) extends UnaryLocalNode(conf) {

assert(projections.size > 0)

private[this] var result: InternalRow = _
private[this] var idx: Int = _
private[this] var input: InternalRow = _

private[this] var groups: Array[Projection] = _

override def open(): Unit = {
child.open()
idx = -1
groups = projections.map(ee => newProjection(ee, child.output)).toArray
}

override def next(): Boolean = {
idx += 1
if (idx < groups.length) {
result = groups(idx)(input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, at this point input hasn't been initialized yet has it? Did you intend to pass null into the projection?

true
} else if (child.next()) {
input = child.fetch()
idx = 0
result = groups(idx)(input)
true
} else {
false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

if (idx < 0 || idx >= groups.length) {
  if (child.next()) {
    input = child.fetch()
    idx = 0
  } else {
    return false
  }
}
result = groups(idx)(input)
idx += 1
true

a little less duplication

}

override def fetch(): InternalRow = result

override def close(): Unit = child.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate


case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode {
case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
extends UnaryLocalNode(conf) {

private[this] var predicate: (InternalRow) => Boolean = _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute


case class LimitNode(limit: Int, child: LocalNode) extends UnaryLocalNode {
case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf) {

private[this] var count = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.Row
import org.apache.spark.Logging
import org.apache.spark.sql.{SQLConf, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate, GenerateMutableProjection, GenerateProjection}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.StructType

Expand All @@ -29,10 +31,18 @@ import org.apache.spark.sql.types.StructType
* Before consuming the iterator, open function must be called.
* After consuming the iterator, close function must be called.
*/
abstract class LocalNode extends TreeNode[LocalNode] {
abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging {

val codegenEnabled: Boolean = conf.codegenEnabled

val unsafeEnabled: Boolean = conf.unsafeEnabled

private[this] def isTesting: Boolean = sys.props.contains("spark.testing")

def output: Seq[Attribute]

lazy val schema: StructType = StructType.fromAttributes(output)

/**
* Initializes the iterator state. Must be called before calling `next()`.
*
Expand All @@ -57,6 +67,18 @@ abstract class LocalNode extends TreeNode[LocalNode] {
*/
def close(): Unit

/** Specifies whether this operator outputs UnsafeRows */
def outputsUnsafeRows: Boolean = false

/** Specifies whether this operator is capable of processing UnsafeRows */
def canProcessUnsafeRows: Boolean = false

/**
* Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
* that are not UnsafeRows).
*/
def canProcessSafeRows: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many of these can be protected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods will be used out of LocalNode, such as building a LocalNode tree. So I didn't use protected for them.


/**
* Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
*/
Expand All @@ -73,17 +95,112 @@ abstract class LocalNode extends TreeNode[LocalNode] {
}
result
}

protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if(codegenEnabled) {
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}

protected def newProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate projection, fallback to interpret", e)
new InterpretedProjection(expressions, inputSchema)
}
}
} else {
new InterpretedProjection(expressions, inputSchema)
}
}

protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: can you put inputSchema on new line, same in L107

if (codegenEnabled) {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
} else {
InterpretedPredicate.create(expression, inputSchema)
}
}

def toIterator: Iterator[InternalRow] = new Iterator[InternalRow] {

private var currentRow: InternalRow = null

override def hasNext: Boolean = {
if (currentRow == null) {
if (LocalNode.this.next()) {
currentRow = fetch()
true
} else {
false
}
} else {
true
}
}

override def next(): InternalRow = {
val r = currentRow
currentRow = null
r
}
}
}


abstract class LeafLocalNode extends LocalNode {
abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf) {
override def children: Seq[LocalNode] = Seq.empty
}


abstract class UnaryLocalNode extends LocalNode {
abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf) {

def child: LocalNode

override def children: Seq[LocalNode] = Seq(child)
}

abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf) {

def left: LocalNode

def right: LocalNode

override def children: Seq[LocalNode] = Seq(left, right)
}
Loading