Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ class CodegenContext {
*/
val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]()

/**
* Add an object to `references`, create a class member to access it.
*
* Returns the name of class member.
*/
def addReferenceObj(name: String, obj: Any, className: String = null): String = {
val term = freshName(name)
val idx = references.length
references += obj
val clsName = Option(className).getOrElse(obj.getClass.getName)
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
term
}

/**
* Holding a list of generated columns as input of current operator, will be used by
* BoundReference to generate code.
Expand Down Expand Up @@ -198,6 +212,39 @@ class CodegenContext {
}
}

/**
* Update a column in MutableRow from ExprCode.
*/
def updateColumn(
row: String,
dataType: DataType,
ordinal: Int,
ev: ExprCode,
nullable: Boolean): String = {
if (nullable) {
// Can't call setNullAt on DecimalType, because we need to keep the offset
if (dataType.isInstanceOf[DecimalType]) {
s"""
if (!${ev.isNull}) {
${setColumn(row, dataType, ordinal, ev.value)};
} else {
${setColumn(row, dataType, ordinal, "null")};
}
"""
} else {
s"""
if (!${ev.isNull}) {
${setColumn(row, dataType, ordinal, ev.value)};
} else {
$row.setNullAt($ordinal);
}
"""
}
} else {
s"""${setColumn(row, dataType, ordinal, ev.value)};"""
}
}

/**
* Returns the name used in accessor and setter for a Java primitive type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,31 +88,8 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu

val updates = validExpr.zip(index).map {
case (e, i) =>
if (e.nullable) {
if (e.dataType.isInstanceOf[DecimalType]) {
// Can't call setNullAt on DecimalType, because we need to keep the offset
s"""
if (this.isNull_$i) {
${ctx.setColumn("mutableRow", e.dataType, i, "null")};
} else {
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
}
"""
} else {
s"""
if (this.isNull_$i) {
mutableRow.setNullAt($i);
} else {
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
}
"""
}
} else {
s"""
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
"""
}

val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}

val allProjections = ctx.splitExpressions(ctx.INPUT_ROW, projectionCodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution;

import java.io.IOException;

import scala.collection.Iterator;

import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -34,7 +36,7 @@ public class BufferedRowIterator {
// used when there is no column in output
protected UnsafeRow unsafeRow = new UnsafeRow(0);

public boolean hasNext() {
public boolean hasNext() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd you change this to throw exceptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KVIterator will throws IOException, we need to propagate it here.

if (currentRow == null) {
processNext();
}
Expand All @@ -56,7 +58,7 @@ public void setInput(Iterator<InternalRow> iter) {
*
* After it's called, if currentRow is still null, it means no more rows left.
*/
protected void processNext() {
protected void processNext() throws IOException {
if (input.hasNext()) {
currentRow = input.next();
}
Expand Down
Loading