Skip to content

Conversation

@JoshRosen
Copy link
Contributor

@JoshRosen JoshRosen commented Sep 10, 2019

⚠️ I've marked this PR as WIP pending benchmarking (to verify that it's actually faster) and time to think about possible corner-cases. ⚠️

What changes were proposed in this pull request?

Spark 2.x has two separate implementations of the "create named struct" expression: regular CreateNamedStruct and CreateNamedStructUnsafe. The former produces GenericInternalRows, while the latter produces UnsafeRows. Both expressions extend the CreateNameStructLike trait.

The "unsafe" version was added in SPARK-9373 / #7689 to support structs in GenerateUnsafeProjection (this was fairly early in the Tungsten effort, circa mid-2015 / Spark 1.5.x).

This PR changes Spark so the UnsafeRow-based codepath is always used and removes the GenericRow-based path. For ease-of-review, I've broken this into two commits:

  • The first commit changes CreateNamedStruct to use the CreateNamedStructUnsafe implementation (producing UnsafeRow) and deletes CreateNamedStructUnsafe.
  • The second commit removes the CreateNameStructLike trait (since at that point it only has a single implementation).

Why are the changes needed?

The old CreateNamedStruct code path allocated a fresh GenericInternalRow on every invocation, incurring object allocation and primitive-boxing performance overheads.

I suspect that this can be a significant performance problem in code which uses Datasets: Spark's ExpressionEncoder uses CreateNamedStruct for serializing / deserializing case classes, so the GenericInternalRow performance problems can impact typed Dataset operations (e.g. .map()).

I spotted this while doing a deep-dive into Encoder-generated code.

I think there's also code-simplification benefits: if we only have a single implementation of CreateNamedStruct then we're removing the possibility for bugs in case code authors pattern-match on concrete classes instead of the CreateNamedStructLike trait.

Does this PR introduce any user-facing change?

There is no expected user-facing behavioral change.

However, there is an binary-compatibility-breaking change due to the deletion of CreateNamedStructUnsafe and CreateNamedStructLike. I suspect that these were originally intended to be internal / private classes and therefore propose that this is an acceptable breaking change in Spark 3.x (since it seems pretty unlikely that Spark users would be directly using those internal interfaces).

How was this patch tested?

I believe this is covered by Spark's existing tests. CreateNamedStructUnsafe is already used in GenerateUnsafeProjection and thus already had some pre-existing test coverage.

@JoshRosen JoshRosen changed the title [SPARK-29033][SQL][WIP [SPARK-29033][SQL][WIP] Always use UnsafeRow-based version of CreateNamedStruct Sep 10, 2019
case (result: Float, expected: Float) =>
if (expected.isNaN) result.isNaN else expected == result
case (result: Row, expected: InternalRow) => result.toSeq == expected.toSeq(result.schema)
case (result: Seq[InternalRow], expected: Seq[InternalRow]) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was needed because we can't do direct equals() comparison between UnsafeRow and other row classes. After this PR's changes, the "SPARK-14793: split wide struct creation into blocks due to JVM code size limit" test case in CodeGenerationSuite was failing because the new code was producing UnsafeRow but the test code was comparing against GenericInternalRow. In the old code, this comparison between sequences of rows was happening in the default case _ => below, but that case doesn't work when the InternalRow implementations are mismatched.

I'm not sure whether this change-of-internal-row-format will have adverse consequences in other parts of the code.

@JoshRosen
Copy link
Contributor Author

As a high-level illustration of why I think this might improve performance, compare the following before-and-after excerpts from running the following in spark-shell:

import org.apache.spark.sql.execution.debug._
spark.sql("select struct(id, id) from range(100)").debugCodegen

Before:

/* 021 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 022 */     partitionIndex = index;
/* 023 */     this.inputs = inputs;
/* 024 */
/* 025 */     range_taskContext_0 = TaskContext.get();
/* 026 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 027 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 028 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 030 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_mutableStateArray_0[2], 2);
/* 031 */
/* 032 */   }
/* 033 */
/* 034 */   private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException {
/* 035 */     Object[] project_values_0 = new Object[2];
/* 036 */
/* 037 */     if (false) {
/* 038 */       project_values_0[0] = null;
/* 039 */     } else {
/* 040 */       project_values_0[0] = project_expr_0_0;
/* 041 */     }
/* 042 */
/* 043 */     if (false) {
/* 044 */       project_values_0[1] = null;
/* 045 */     } else {
/* 046 */       project_values_0[1] = project_expr_0_0;
/* 047 */     }
/* 048 */
/* 049 */     final InternalRow project_value_1 = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(project_values_0);
/* 050 */     project_values_0 = null;
/* 051 */     range_mutableStateArray_0[2].reset();
/* 052 */
/* 053 */     final InternalRow project_tmpInput_0 = project_value_1;
/* 054 */     if (project_tmpInput_0 instanceof UnsafeRow) {
/* 055 */       range_mutableStateArray_0[2].write(0, (UnsafeRow) project_tmpInput_0);
/* 056 */     } else {
/* 057 */       // Remember the current cursor so that we can calculate how many bytes are
/* 058 */       // written later.
/* 059 */       final int project_previousCursor_0 = range_mutableStateArray_0[2].cursor();
/* 060 */
/* 061 */       range_mutableStateArray_0[3].resetRowWriter();
/* 062 */
/* 063 */       range_mutableStateArray_0[3].write(0, (project_tmpInput_0.getLong(0)));
/* 064 */
/* 065 */       range_mutableStateArray_0[3].write(1, (project_tmpInput_0.getLong(1)));
/* 066 */
/* 067 */       range_mutableStateArray_0[2].setOffsetAndSizeFromPreviousCursor(0, project_previousCursor_0);
/* 068 */     }
/* 069 */     append((range_mutableStateArray_0[2].getRow()));
/* 070 */
/* 071 */   }

After:

/* 021 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 022 */     partitionIndex = index;
/* 023 */     this.inputs = inputs;
/* 024 */
/* 025 */     range_taskContext_0 = TaskContext.get();
/* 026 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 027 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 028 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 030 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 031 */     range_mutableStateArray_0[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(range_mutableStateArray_0[3], 2);
/* 032 */
/* 033 */   }
/* 034 */
/* 035 */   private void project_doConsume_0(long project_expr_0_0) throws java.io.IOException {
/* 036 */     range_mutableStateArray_0[2].reset();
/* 037 */
/* 038 */     range_mutableStateArray_0[2].write(0, project_expr_0_0);
/* 039 */
/* 040 */     range_mutableStateArray_0[2].write(1, project_expr_0_0);
/* 041 */     range_mutableStateArray_0[3].reset();
/* 042 */
/* 043 */     final InternalRow project_tmpInput_0 = (range_mutableStateArray_0[2].getRow());
/* 044 */     if (project_tmpInput_0 instanceof UnsafeRow) {
/* 045 */       range_mutableStateArray_0[3].write(0, (UnsafeRow) project_tmpInput_0);
/* 046 */     } else {
/* 047 */       // Remember the current cursor so that we can calculate how many bytes are
/* 048 */       // written later.
/* 049 */       final int project_previousCursor_0 = range_mutableStateArray_0[3].cursor();
/* 050 */
/* 051 */       range_mutableStateArray_0[4].resetRowWriter();
/* 052 */
/* 053 */       range_mutableStateArray_0[4].write(0, (project_tmpInput_0.getLong(0)));
/* 054 */
/* 055 */       range_mutableStateArray_0[4].write(1, (project_tmpInput_0.getLong(1)));
/* 056 */
/* 057 */       range_mutableStateArray_0[3].setOffsetAndSizeFromPreviousCursor(0, project_previousCursor_0);
/* 058 */     }
/* 059 */     append((range_mutableStateArray_0[3].getRow()));
/* 060 */
/* 061 */   }

@SparkQA
Copy link

SparkQA commented Sep 10, 2019

Test build #110424 has finished for PR 25745 at commit d3925bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@joshrosen-stripe
Copy link
Contributor

While we're at it, there could be significant wins from eliminating the if (project_tmpInput_0 instanceof UnsafeRow) checks: if we can always use the UnsafeRow branch here then we can significantly reduce the generated code size (since most of the code is in the else branch). This could significantly reduce code size for wide Encoder projections, addressing a performance issue that I've seen in the wild (generating code which is too large to JIT).

*/
case class CreateNamedStructUnsafe(children: Seq[Expression]) extends CreateNamedStructLike {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val eval = GenerateUnsafeProjection.createCode(ctx, valExprs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any types GenerateUnsafeProjection doesn't support? From GenerateUnsafeProjection.canSupport, looks no.

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110643 has finished for PR 25745 at commit d3925bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Just for reference, we are considering to do opposite, removing CreateNamedStructUnsafe as it brings some issue on MapObjects in some cases. #26173

@JoshRosen
Copy link
Contributor Author

@HeartSaVioR, thanks for the link. I'm closing this PR and will resolve my JIRA as "Won't Fix".

@JoshRosen JoshRosen closed this Oct 26, 2019
@HyukjinKwon
Copy link
Member

@HeartSaVioR, if this improves the performance in general, we might rather have to revert #26173 but take this since apparently that PR does not have any user-facing change. Can you clarify it at #26173 (comment)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants