Skip to content

Commit 0cc6dfd

Browse files
ahshahidSumedh Wale
authored andcommitted
[SNAPPYDATA] Bootstrap perf (#16)
Change involves: 1) Reducing the generated code size when writing struct having all fields of same data type. 2) Fixing an issue in WholeStageCodeGenExec, where a plan supporting CodeGen was not being prefixed by InputAdapter in case, the node did not participate in whole stage code gen.
1 parent e2b6084 commit 0cc6dfd

File tree

3 files changed

+80
-7
lines changed

3 files changed

+80
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,43 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
5252
ctx.addMutableState("Object[]", values, s"this.$values = null;")
5353

5454
val rowClass = classOf[GenericInternalRow].getName
55-
56-
val fieldWriters = schema.map(_.dataType).zipWithIndex.map { case (dt, i) =>
57-
val converter = convertToSafe(ctx, ctx.getValue(tmp, dt, i.toString), dt)
55+
val isHomogenousStruct = {
56+
var i = 1
57+
val ref = ctx.javaType(schema.fields(0).dataType)
58+
var broken = false || !ctx.isPrimitiveType(ref) || schema.length <=1
59+
while( !broken && i < schema.length) {
60+
if(ctx.javaType(schema.fields(i).dataType) != ref) {
61+
broken = true
62+
}
63+
i +=1
64+
}
65+
!broken
66+
}
67+
val allFields = if(isHomogenousStruct) {
68+
val counter = ctx.freshName("counter")
69+
val converter = convertToSafe(ctx, ctx.getValue(tmp, schema.fields(0).dataType, counter), schema.fields(0).dataType)
5870
s"""
71+
for(int $counter = 0; $counter < ${schema.length}; ++$counter) {
72+
if (!$tmp.isNullAt($counter)) {
73+
${converter.code}
74+
$values[$counter] = ${converter.value};
75+
}
76+
}
77+
"""
78+
79+
}else {
80+
val fieldWriters = schema.map(_.dataType).zipWithIndex.map { case (dt, i) =>
81+
val converter = convertToSafe(ctx, ctx.getValue(tmp, dt, i.toString), dt)
82+
s"""
5983
if (!$tmp.isNullAt($i)) {
6084
${converter.code}
6185
$values[$i] = ${converter.value};
6286
}
6387
"""
88+
}
89+
ctx.splitExpressions(tmp, fieldWriters)
6490
}
65-
val allFields = ctx.splitExpressions(tmp, fieldWriters)
91+
6692
val code = s"""
6793
final InternalRow $tmp = $input;
6894
this.$values = new Object[${schema.length}];

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,56 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
109109

110110
val writeField = dt match {
111111
case t: StructType =>
112-
s"""
112+
val isHomogenousStruct = {
113+
var i = 1
114+
val ref = ctx.javaType(t.fields(0).dataType)
115+
var broken = false || !ctx.isPrimitiveType(ref) || t.length <=1
116+
while( !broken && i < t.length) {
117+
if(ctx.javaType(t.fields(i).dataType) != ref) {
118+
broken = true
119+
}
120+
i +=1
121+
}
122+
!broken
123+
}
124+
if(isHomogenousStruct) {
125+
val counter = ctx.freshName("counter")
126+
val rowWriterChild = ctx.freshName("rowWriterChild")
127+
128+
s"""
113129
// Remember the current cursor so that we can calculate how many bytes are
114130
// written later.
131+
132+
final int $tmpCursor = $bufferHolder.cursor;
133+
134+
if (${input.value} instanceof UnsafeRow) {
135+
${writeUnsafeData(ctx, s"((UnsafeRow) ${input.value})", bufferHolder)};
136+
} else {
137+
$rowWriterClass $rowWriterChild = new $rowWriterClass($bufferHolder, ${t.length});
138+
$rowWriterChild.reset();
139+
for(int $counter = 0; $counter < ${t.length}; ++$counter) {
140+
if (${input.value}.isNullAt($index)) {
141+
$rowWriterChild.setNullAt($index);
142+
}else {
143+
$rowWriterChild.write($counter, ${ctx.getValue(input.value, t.fields(0).dataType,
144+
counter)});
145+
}
146+
}
147+
}
148+
$rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor);
149+
"""
150+
151+
152+
}else {
153+
s"""
154+
// Remember the current cursor so that we can calculate how many bytes are
155+
// written later.
156+
115157
final int $tmpCursor = $bufferHolder.cursor;
116158
${writeStructToBuffer(ctx, input.value, t.map(_.dataType), bufferHolder)}
117159
$rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor);
118160
"""
161+
}
119162

120163
case a @ ArrayType(et, _) =>
121164
s"""

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,14 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
487487
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = plan match {
488488
// For operators that will output domain object, do not insert WholeStageCodegen for it as
489489
// domain object can not be written into unsafe row.
490-
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
490+
case plan if plan.output.length == 1 &&
491+
plan.output.head.dataType.isInstanceOf[ObjectType] =>
491492
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
492-
case plan: CodegenSupport if supportCodegen(plan) =>
493+
case plan: CodegenSupport => if (supportCodegen(plan)) {
493494
WholeStageCodegenExec(insertInputAdapter(plan))
495+
} else {
496+
plan.withNewChildren(plan.children.map(insertInputAdapter))
497+
}
494498
case other =>
495499
other.withNewChildren(other.children.map(insertWholeStageCodegen))
496500
}

0 commit comments

Comments
 (0)