Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ case class ArraysZip(children: Seq[Expression]) extends Expression with ExpectsI
""".stripMargin
}

val splittedGetValuesAndCardinalities = ctx.splitExpressions(
val splittedGetValuesAndCardinalities = ctx.splitExpressionsWithCurrentInputs(
Copy link
Member

@viirya viirya Jun 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct call of splitExpressions here has two problems:

  1. splitExpressions can't work under wholestage codegen.
  2. current row input is ignored in arguments for non wholestage codegen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're 100% right. That's why in the UT I added I am testing both cases.

expressions = getValuesAndCardinalities,
funcName = "getValuesAndCardinalities",
returnType = "int",
Expand All @@ -209,7 +209,7 @@ case class ArraysZip(children: Seq[Expression]) extends Expression with ExpectsI
|return $biggestCardinality;
""".stripMargin,
foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"),
arguments =
extraArguments =
("ArrayData[]", arrVals) ::
("int", biggestCardinality) :: Nil)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,17 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(df8.selectExpr("arrays_zip(v1, v2)"), expectedValue8)
}

test("SPARK-24633: arrays_zip splits input processing correctly") {
Seq("true", "false").foreach { wholestageCodegenEnabled =>
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholestageCodegenEnabled) {
val df = spark.range(1)
val exprs = (0 to 5).map(x => array($"id" + lit(x)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't splitting input processing for me. Maybe splitExpressionsWithCurrentInputs has a bigger threshold than splitExpressions.

Even at 90, it still had not split the input processing. At 100, it finally did. So someplace between 90 and 100, it starts splitting.

I might be looking at the wrong thing. Check at your end.

        val exprs = (0 to 100).map(x => array($"id" + lit(x)))
        checkAnswer(df.select(arrays_zip(exprs: _*)),
          Row(Seq(Row(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
            21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41,
            42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62,
            63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
            84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With/Without this PR, I see the split methods in the generated code using the original test cases (i.e. (0 to 5).map...).

/* 226 */   private ArrayData ArraysZip_0(InternalRow i) {
/* 227 */     ArrayData[] arrVals_0 = new ArrayData[6];
/* 228 */     int biggestCardinality_0 = 0;
/* 229 */     ArrayData value_0 = null;
/* 230 */
/* 231 */     biggestCardinality_0 = getValuesAndCardinalities_0_0(i, arrVals_0, biggestCardinality_0);
/* 232 */     biggestCardinality_0 = getValuesAndCardinalities_0_1(i, arrVals_0, biggestCardinality_0);
/* 233 */     biggestCardinality_0 = getValuesAndCardinalities_0_2(i, arrVals_0, biggestCardinality_0);
/* 234 */     boolean isNull_0 = biggestCardinality_0 == -1;
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bersprockets it is not splitting if you have wholestagecodegen enabled because in that case we are not able (at the moment at least) to split methods using current inputs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgaido91 Got it, you were testing that it does not split when wholestagecodegen is enabled.

checkAnswer(df.select(arrays_zip(exprs: _*)),
Row(Seq(Row(0, 1, 2, 3, 4, 5))))
}
}
}

test("map size function") {
val df = Seq(
(Map[Int, Int](1 -> 1, 2 -> 2), "x"),
Expand Down