Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public final CalendarInterval getInterval(int rowId) {
/**
* @return child [[ColumnVector]] at the given ordinal.
*/
protected abstract ColumnVector getChild(int ordinal);
public abstract ColumnVector getChild(int ordinal);

/**
* Data type for this column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* the entire data loading process.
*/
@Evolving
public final class ColumnarBatch {
public final class ColumnarBatch implements AutoCloseable {
private int numRows;
private final ColumnVector[] columns;

Expand All @@ -42,6 +42,7 @@ public final class ColumnarBatch {
* Called to close all the columns in this batch. It is not valid to access the data after
* calling this. This must be called at the end to clean up memory allocations.
*/
@Override
public void close() {
for (ColumnVector c: columns) {
c.close();
Expand Down Expand Up @@ -110,7 +111,17 @@ public InternalRow getRow(int rowId) {
}

public ColumnarBatch(ColumnVector[] columns) {
this(columns, 0);
}

/**
* Create a new batch from existing column vectors.
* @param columns The columns of this batch
* @param numRows The number of rows in this batch
*/
public ColumnarBatch(ColumnVector[] columns, int numRows) {
this.columns = columns;
this.numRows = numRows;
this.row = new ColumnarBatchRow(columns);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,10 @@ public final int appendArray(int length) {
*/
public final int appendStruct(boolean isNull) {
if (isNull) {
appendNull();
// This is the same as appendNull but without the assertion for struct types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because appendNull itself has an assertion that you are not appending a struct. So any call to appendStruct with isNull true would have failed.

reserve(elementsAppended + 1);
putNull(elementsAppended);
elementsAppended++;
for (WritableColumnVector c: childColumns) {
if (c.type instanceof StructType) {
c.appendStruct(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.ColumnarRule

/**
* :: Experimental ::
Expand All @@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
* <li>Planning Strategies.</li>
* <li>Customized Parser.</li>
* <li>(External) Catalog listeners.</li>
* <li>Columnar Rules.</li>
* </ul>
*
* The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for
Expand Down Expand Up @@ -93,6 +95,23 @@ class SparkSessionExtensions {
type StrategyBuilder = SparkSession => Strategy
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
type FunctionDescription = (FunctionIdentifier, ExpressionInfo, FunctionBuilder)
type ColumnarRuleBuilder = SparkSession => ColumnarRule

private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]

/**
* Build the override rules for columnar execution.
*/
private[sql] def buildColumnarRules(session: SparkSession): Seq[ColumnarRule] = {
columnarRuleBuilders.map(_.apply(session))
}

/**
* Inject a rule that can override the columnar execution of an executor.
*/
def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
columnarRuleBuilders += builder
}

private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]

Expand Down
Loading