-
Notifications
You must be signed in to change notification settings - Fork 2
Account for pyarrow API changes, read multiple record batches #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…batches in a stream
|
FWIW, on my laptop, starting with the benchmark.py script from the JIRA issue, I have: So about 98% of the runtime is obtaining the data payload from Spark, only 2% Arrow deserialization |
|
No problem, looks much better now with That's an interesting profile, hopefully collecting at the executors will put a dent in that. |
…batches in a stream closes #22
|
@BryanCutler @icexelloss can you kindly give me instructions how to see the Spark driver / executor logs? I added some logging statements and I've been scouring the internet trying to see how to get these to print to the console or a file but I haven't had any success. The codebase is littered with log4j properties files, but I can't tell which one I need to edit. This is based on starting Spark in a Jupyter notebook using effectively the code found in the benchmark.py in https://issues.apache.org/jira/browse/SPARK-13534 |
|
Wes,
The one that spark uses should be conf/log4j.properties, have you tried
that one?
…On Mon, Jan 30, 2017 at 11:45 PM, Wes McKinney ***@***.***> wrote:
@BryanCutler <https://github.com/BryanCutler> @icexelloss
<https://github.com/icexelloss> can you kindly give me instructions how
to see the Spark driver / executor logs? I added some logging statements
and I've been scouring the internet trying to see how to get these to print
to the console or a file but I haven't had any success. The codebase is
littered with log4j properties files, but I can't tell which one I need to
edit.
This is based on starting Spark in a Jupyter notebook using effectively
the code found in the benchmark.py in https://issues.apache.org/
jira/browse/SPARK-13534
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#22 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAwbrDwS0gz8Hzo304Zt2CasN-OvHTYjks5rXryDgaJpZM4LyAlr>
.
|
|
Can you give me an example properties file and let me know where to find the logs? Do I need to rebuild from scratch? |
|
@BryanCutler I put up a branch here wesm@de4c6ed -- trying desperately to get logging output from Spark SQL but not having any success |
|
@wesm, there should be a sample log4j .template file in the SPARK_HOME/conf
dir. I'll take a look at the branch as soon as I get in.
…On Jan 31, 2017 8:23 AM, "Wes McKinney" ***@***.***> wrote:
@BryanCutler <https://github.com/BryanCutler> I put up a branch here
https://github.com/wesm/spark/tree/logging-trials -- trying desperately
to get logging output from Spark SQL but not having any success
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#22 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AEUwdbdXTjP3GoNW4k6jPSoUCmVVpyy4ks5rX2ATgaJpZM4LyAlr>
.
|
|
Your changes worked for me, from running on the command line in local mode, at least. I'm not sure if the driver log would get redirected somewhere when running Jupyter though. I ran with this command (just redirecting stderr, since the logs go there by default) |
|
log4j.properties.txt |
|
Will give it a shot. Am using Parquet for the benchmarks which appears to cause some problems with the logging system |
|
I'm able to get the logs in spark-submit, trying to get them interactively in a Jupyter notebook or via |
|
I add this section to log4j file to setup an appender for the Then running the code below in pyspark shows your log In [3]: df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
In [4]: df.toPandas(useArrow=True)
Time to collect: 766972422
17/01/31 11:38:54 INFO ArrowConverters: Arrow conversion time: 48550299I don't have a Jupyter install at the moment, but I think it would apply there too. You can also set |
|
thanks, I will try when I can! |
…batches in a stream closes #22
### What changes were proposed in this pull request? Adding codegen for shuffled hash join. Shuffled hash join codegen is very similar to broadcast hash join codegen. So most of code change is to refactor existing codegen in `BroadcastHashJoinExec` to `HashJoin`. Example codegen for query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153): ``` def shuffleHashJoin(): Unit = { val N: Long = 4 << 20 withSQLConf( SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000", SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { codegenBenchmark("shuffle hash join", N) { val df1 = spark.range(N).selectExpr(s"id as k1") val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2") val df = df1.join(df2, col("k1") === col("k2")) assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined) df.noop() } } } ``` Shuffled hash join codegen: ``` == Subtree 3 / 3 (maxMethodCodeSize:113; maxConstantPoolSize:126(0.19% used); numInnerClasses:0) == *(3) ShuffledHashJoin [k1#2L], [k2#6L], Inner, BuildRight :- *(1) Project [id#0L AS k1#2L] : +- *(1) Range (0, 4194304, step=1, splits=1) +- *(2) Project [(id#4L * 3) AS k2#6L] +- *(2) Range (0, 1398101, step=1, splits=1) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage3(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=3 /* 006 */ final class GeneratedIteratorForCodegenStage3 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator inputadapter_input_0; /* 010 */ private org.apache.spark.sql.execution.joins.HashedRelation shj_relation_0; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] shj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 012 */ /* 013 */ public GeneratedIteratorForCodegenStage3(Object[] references) { /* 014 */ this.references = references; /* 015 */ } /* 016 */ /* 017 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 018 */ partitionIndex = index; /* 019 */ this.inputs = inputs; /* 020 */ inputadapter_input_0 = inputs[0]; /* 021 */ shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]); /* 022 */ shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 023 */ /* 024 */ } /* 025 */ /* 026 */ private void shj_doConsume_0(InternalRow inputadapter_row_0, long shj_expr_0_0) throws java.io.IOException { /* 027 */ // generate join key for stream side /* 028 */ /* 029 */ // find matches from HashRelation /* 030 */ scala.collection.Iterator shj_matches_0 = false ? /* 031 */ null : (scala.collection.Iterator)shj_relation_0.get(shj_expr_0_0); /* 032 */ if (shj_matches_0 != null) { /* 033 */ while (shj_matches_0.hasNext()) { /* 034 */ UnsafeRow shj_matched_0 = (UnsafeRow) shj_matches_0.next(); /* 035 */ { /* 036 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1); /* 037 */ /* 038 */ long shj_value_1 = shj_matched_0.getLong(0); /* 039 */ shj_mutableStateArray_0[0].reset(); /* 040 */ /* 041 */ shj_mutableStateArray_0[0].write(0, shj_expr_0_0); /* 042 */ /* 043 */ shj_mutableStateArray_0[0].write(1, shj_value_1); /* 044 */ append((shj_mutableStateArray_0[0].getRow()).copy()); /* 045 */ /* 046 */ } /* 047 */ } /* 048 */ } /* 049 */ /* 050 */ } /* 051 */ /* 052 */ protected void processNext() throws java.io.IOException { /* 053 */ while ( inputadapter_input_0.hasNext()) { /* 054 */ InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next(); /* 055 */ /* 056 */ long inputadapter_value_0 = inputadapter_row_0.getLong(0); /* 057 */ /* 058 */ shj_doConsume_0(inputadapter_row_0, inputadapter_value_0); /* 059 */ if (shouldStop()) return; /* 060 */ } /* 061 */ } /* 062 */ /* 063 */ } ``` Broadcast hash join codegen for the same query (for reference here): ``` == Subtree 2 / 2 (maxMethodCodeSize:280; maxConstantPoolSize:218(0.33% used); numInnerClasses:0) == *(2) BroadcastHashJoin [k1#2L], [k2#6L], Inner, BuildRight, false :- *(2) Project [id#0L AS k1#2L] : +- *(2) Range (0, 4194304, step=1, splits=1) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#22] +- *(1) Project [(id#4L * 3) AS k2#6L] +- *(1) Range (0, 1398101, step=1, splits=1) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean range_initRange_0; /* 010 */ private long range_nextIndex_0; /* 011 */ private TaskContext range_taskContext_0; /* 012 */ private InputMetrics range_inputMetrics_0; /* 013 */ private long range_batchEnd_0; /* 014 */ private long range_numElementsTodo_0; /* 015 */ private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ /* 026 */ range_taskContext_0 = TaskContext.get(); /* 027 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 028 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ /* 032 */ bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy(); /* 033 */ incPeakExecutionMemory(bhj_relation_0.estimatedSize()); /* 034 */ /* 035 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 036 */ /* 037 */ } /* 038 */ /* 039 */ private void initRange(int idx) { /* 040 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 041 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L); /* 042 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(4194304L); /* 043 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 044 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 045 */ long partitionEnd; /* 046 */ /* 047 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 048 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 049 */ range_nextIndex_0 = Long.MAX_VALUE; /* 050 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 051 */ range_nextIndex_0 = Long.MIN_VALUE; /* 052 */ } else { /* 053 */ range_nextIndex_0 = st.longValue(); /* 054 */ } /* 055 */ range_batchEnd_0 = range_nextIndex_0; /* 056 */ /* 057 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 058 */ .multiply(step).add(start); /* 059 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 060 */ partitionEnd = Long.MAX_VALUE; /* 061 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 062 */ partitionEnd = Long.MIN_VALUE; /* 063 */ } else { /* 064 */ partitionEnd = end.longValue(); /* 065 */ } /* 066 */ /* 067 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 068 */ java.math.BigInteger.valueOf(range_nextIndex_0)); /* 069 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue(); /* 070 */ if (range_numElementsTodo_0 < 0) { /* 071 */ range_numElementsTodo_0 = 0; /* 072 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 073 */ range_numElementsTodo_0++; /* 074 */ } /* 075 */ } /* 076 */ /* 077 */ private void bhj_doConsume_0(long bhj_expr_0_0) throws java.io.IOException { /* 078 */ // generate join key for stream side /* 079 */ /* 080 */ // find matches from HashedRelation /* 081 */ UnsafeRow bhj_matched_0 = false ? null: (UnsafeRow)bhj_relation_0.getValue(bhj_expr_0_0); /* 082 */ if (bhj_matched_0 != null) { /* 083 */ { /* 084 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 085 */ /* 086 */ long bhj_value_2 = bhj_matched_0.getLong(0); /* 087 */ range_mutableStateArray_0[3].reset(); /* 088 */ /* 089 */ range_mutableStateArray_0[3].write(0, bhj_expr_0_0); /* 090 */ /* 091 */ range_mutableStateArray_0[3].write(1, bhj_value_2); /* 092 */ append((range_mutableStateArray_0[3].getRow())); /* 093 */ /* 094 */ } /* 095 */ } /* 096 */ /* 097 */ } /* 098 */ /* 099 */ protected void processNext() throws java.io.IOException { /* 100 */ // initialize Range /* 101 */ if (!range_initRange_0) { /* 102 */ range_initRange_0 = true; /* 103 */ initRange(partitionIndex); /* 104 */ } /* 105 */ /* 106 */ while (true) { /* 107 */ if (range_nextIndex_0 == range_batchEnd_0) { /* 108 */ long range_nextBatchTodo_0; /* 109 */ if (range_numElementsTodo_0 > 1000L) { /* 110 */ range_nextBatchTodo_0 = 1000L; /* 111 */ range_numElementsTodo_0 -= 1000L; /* 112 */ } else { /* 113 */ range_nextBatchTodo_0 = range_numElementsTodo_0; /* 114 */ range_numElementsTodo_0 = 0; /* 115 */ if (range_nextBatchTodo_0 == 0) break; /* 116 */ } /* 117 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L; /* 118 */ } /* 119 */ /* 120 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L); /* 121 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) { /* 122 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0; /* 123 */ /* 124 */ bhj_doConsume_0(range_value_0); /* 125 */ /* 126 */ if (shouldStop()) { /* 127 */ range_nextIndex_0 = range_value_0 + 1L; /* 128 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1); /* 129 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1); /* 130 */ return; /* 131 */ } /* 132 */ /* 133 */ } /* 134 */ range_nextIndex_0 = range_batchEnd_0; /* 135 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0); /* 136 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0); /* 137 */ range_taskContext_0.killTaskIfInterrupted(); /* 138 */ } /* 139 */ } /* 140 */ /* 141 */ } ``` ### Why are the changes needed? Codegen shuffled hash join can help save CPU cost. We added shuffled hash join codegen internally in our fork, and seeing obvious improvement in benchmark compared to current non-codegen code path. Test example query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153), seeing 30% wall clock time improvement compared to existing non-codegen code path: Enable shuffled hash join code-gen: ``` Running benchmark: shuffle hash join Running case: shuffle hash join wholestage off Stopped after 2 iterations, 1358 ms Running case: shuffle hash join wholestage on Stopped after 5 iterations, 2323 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz shuffle hash join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ shuffle hash join wholestage off 649 679 43 6.5 154.7 1.0X shuffle hash join wholestage on 436 465 45 9.6 103.9 1.5X ``` Disable shuffled hash join codegen: ``` Running benchmark: shuffle hash join Running case: shuffle hash join wholestage off Stopped after 2 iterations, 1345 ms Running case: shuffle hash join wholestage on Stopped after 5 iterations, 2967 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU 2.40GHz shuffle hash join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ shuffle hash join wholestage off 646 673 37 6.5 154.1 1.0X shuffle hash join wholestage on 549 594 47 7.6 130.9 1.2X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `WholeStageCodegenSuite`. Closes apache#29277 from c21/codegen. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? `BroadcastNestedLoopJoinExec` does not have code-gen, and we can potentially boost the CPU performance for this operator if we add code-gen for it. https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html also showed the evidence in one fork. The codegen for `BroadcastNestedLoopJoinExec` shared some code with `HashJoin`, and the interface `JoinCodegenSupport` is created to hold those common logic. This PR is only supporting inner and cross join. Other join types will be added later in followup PRs. Example query and generated code: ``` val df1 = spark.range(4).select($"id".as("k1")) val df2 = spark.range(3).select($"id".as("k2")) df1.join(df2, $"k1" + 1 =!= $"k2").explain("codegen") ``` ``` == Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:203(0.31% used); numInnerClasses:0) == *(2) BroadcastNestedLoopJoin BuildRight, Inner, NOT ((k1#2L + 1) = k2#6L) :- *(2) Project [id#0L AS k1#2L] : +- *(2) Range (0, 4, step=1, splits=2) +- BroadcastExchange IdentityBroadcastMode, [id=#22] +- *(1) Project [id#4L AS k2#6L] +- *(1) Range (0, 3, step=1, splits=2) Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage2(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=2 /* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean range_initRange_0; /* 010 */ private long range_nextIndex_0; /* 011 */ private TaskContext range_taskContext_0; /* 012 */ private InputMetrics range_inputMetrics_0; /* 013 */ private long range_batchEnd_0; /* 014 */ private long range_numElementsTodo_0; /* 015 */ private InternalRow[] bnlj_buildRowArray_0; /* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4]; /* 017 */ /* 018 */ public GeneratedIteratorForCodegenStage2(Object[] references) { /* 019 */ this.references = references; /* 020 */ } /* 021 */ /* 022 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 023 */ partitionIndex = index; /* 024 */ this.inputs = inputs; /* 025 */ /* 026 */ range_taskContext_0 = TaskContext.get(); /* 027 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics(); /* 028 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 029 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 030 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0); /* 031 */ bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value(); /* 032 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0); /* 033 */ /* 034 */ } /* 035 */ /* 036 */ private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException { /* 037 */ for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) { /* 038 */ UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0]; /* 039 */ /* 040 */ long bnlj_value_1 = bnlj_buildRow_0.getLong(0); /* 041 */ /* 042 */ long bnlj_value_4 = -1L; /* 043 */ /* 044 */ bnlj_value_4 = bnlj_expr_0_0 + 1L; /* 045 */ /* 046 */ boolean bnlj_value_3 = false; /* 047 */ bnlj_value_3 = bnlj_value_4 == bnlj_value_1; /* 048 */ boolean bnlj_value_2 = false; /* 049 */ bnlj_value_2 = !(bnlj_value_3); /* 050 */ if (!(false || !bnlj_value_2)) /* 051 */ { /* 052 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 053 */ /* 054 */ range_mutableStateArray_0[3].reset(); /* 055 */ /* 056 */ range_mutableStateArray_0[3].write(0, bnlj_expr_0_0); /* 057 */ /* 058 */ range_mutableStateArray_0[3].write(1, bnlj_value_1); /* 059 */ append((range_mutableStateArray_0[3].getRow()).copy()); /* 060 */ /* 061 */ } /* 062 */ } /* 063 */ /* 064 */ } /* 065 */ /* 066 */ private void initRange(int idx) { /* 067 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx); /* 068 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(2L); /* 069 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L); /* 070 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L); /* 071 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L); /* 072 */ long partitionEnd; /* 073 */ /* 074 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start); /* 075 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 076 */ range_nextIndex_0 = Long.MAX_VALUE; /* 077 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 078 */ range_nextIndex_0 = Long.MIN_VALUE; /* 079 */ } else { /* 080 */ range_nextIndex_0 = st.longValue(); /* 081 */ } /* 082 */ range_batchEnd_0 = range_nextIndex_0; /* 083 */ /* 084 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice) /* 085 */ .multiply(step).add(start); /* 086 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) { /* 087 */ partitionEnd = Long.MAX_VALUE; /* 088 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) { /* 089 */ partitionEnd = Long.MIN_VALUE; /* 090 */ } else { /* 091 */ partitionEnd = end.longValue(); /* 092 */ } /* 093 */ /* 094 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract( /* 095 */ java.math.BigInteger.valueOf(range_nextIndex_0)); /* 096 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue(); /* 097 */ if (range_numElementsTodo_0 < 0) { /* 098 */ range_numElementsTodo_0 = 0; /* 099 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) { /* 100 */ range_numElementsTodo_0++; /* 101 */ } /* 102 */ } /* 103 */ /* 104 */ protected void processNext() throws java.io.IOException { /* 105 */ // initialize Range /* 106 */ if (!range_initRange_0) { /* 107 */ range_initRange_0 = true; /* 108 */ initRange(partitionIndex); /* 109 */ } /* 110 */ /* 111 */ while (true) { /* 112 */ if (range_nextIndex_0 == range_batchEnd_0) { /* 113 */ long range_nextBatchTodo_0; /* 114 */ if (range_numElementsTodo_0 > 1000L) { /* 115 */ range_nextBatchTodo_0 = 1000L; /* 116 */ range_numElementsTodo_0 -= 1000L; /* 117 */ } else { /* 118 */ range_nextBatchTodo_0 = range_numElementsTodo_0; /* 119 */ range_numElementsTodo_0 = 0; /* 120 */ if (range_nextBatchTodo_0 == 0) break; /* 121 */ } /* 122 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L; /* 123 */ } /* 124 */ /* 125 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L); /* 126 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) { /* 127 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0; /* 128 */ /* 129 */ // common sub-expressions /* 130 */ /* 131 */ bnlj_doConsume_0(range_value_0); /* 132 */ /* 133 */ if (shouldStop()) { /* 134 */ range_nextIndex_0 = range_value_0 + 1L; /* 135 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1); /* 136 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1); /* 137 */ return; /* 138 */ } /* 139 */ /* 140 */ } /* 141 */ range_nextIndex_0 = range_batchEnd_0; /* 142 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0); /* 143 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0); /* 144 */ range_taskContext_0.killTaskIfInterrupted(); /* 145 */ } /* 146 */ } /* 147 */ /* 148 */ } ``` ### Why are the changes needed? Improve query CPU performance. Added a micro benchmark query in `JoinBenchmark.scala`. Saw 1x of run time improvement: ``` OpenJDK 64-Bit Server VM 11.0.9+11-LTS on Linux 4.14.219-161.340.amzn2.x86_64 Intel(R) Xeon(R) CPU E5-2670 v2 2.50GHz broadcast nested loop join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- broadcast nested loop join wholestage off 62922 63052 184 0.3 3000.3 1.0X broadcast nested loop join wholestage on 30946 30972 26 0.7 1475.6 2.0X ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? * Added unit test in `WholeStageCodegenSuite.scala`, and existing unit tests for `BroadcastNestedLoopJoinExec`. * Updated golden files for several TCPDS query plans, as whole stage code-gen for `BroadcastNestedLoopJoinExec` is triggered. * Updated `JoinBenchmark-jdk11-results.txt ` and `JoinBenchmark-results.txt` with new benchmark result. Followed previous benchmark PRs - apache#27078 and apache#26003 to use same type of machine: ``` Amazon AWS EC2 type: r3.xlarge region: us-west-2 (Oregon) OS: Linux ``` Closes apache#31736 from c21/nested-join-exec. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…ght outer)
### What changes were proposed in this pull request?
This PR is to add code-gen support for left outer (build right) and right outer (build left). Reference: `BroadcastNestedLoopJoinExec.codegenInner()` and `BroadcastNestedLoopJoinExec.outerJoin()`
### Why are the changes needed?
Improve query CPU performance.
Tested with a simple query:
```scala
val N = 20 << 20
val M = 1 << 4
val dim = broadcast(spark.range(M).selectExpr("id as k2"))
codegenBenchmark("left outer broadcast nested loop join", N) {
val df = spark.range(N).selectExpr(s"id as k1").join(
dim, col("k1") + 1 <= col("k2"), "left_outer")
assert(df.queryExecution.sparkPlan.find(
_.isInstanceOf[BroadcastNestedLoopJoinExec]).isDefined)
df.noop()
}
```
Seeing 2x run time improvement:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU 2.40GHz
left outer broadcast nested loop join: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------
left outer broadcast nested loop join wholestage off 3024 3698 953 6.9 144.2 1.0X
left outer broadcast nested loop join wholestage on 1512 1659 172 13.9 72.1 2.0X
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Changed existing unit tests in `OuterJoinSuite` to cover codegen use cases.
Added unit test in WholeStageCodegenSuite.scala to make sure code-gen for broadcast nested loop join is taking effect, and test for multiple join case as well.
Example query:
```scala
val df1 = spark.range(4).select($"id".as("k1"))
val df2 = spark.range(3).select($"id".as("k2"))
df1.join(df2, $"k1" + 1 <= $"k2", "left_outer").explain("codegen")
```
Example generated code (`bnlj_doConsume_0` method):
```java
== Subtree 2 / 2 (maxMethodCodeSize:282; maxConstantPoolSize:210(0.32% used); numInnerClasses:0) ==
*(2) BroadcastNestedLoopJoin BuildRight, LeftOuter, ((k1#2L + 1) <= k2#6L)
:- *(2) Project [id#0L AS k1#2L]
: +- *(2) Range (0, 4, step=1, splits=16)
+- BroadcastExchange IdentityBroadcastMode, [id=#22]
+- *(1) Project [id#4L AS k2#6L]
+- *(1) Range (0, 3, step=1, splits=16)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean range_initRange_0;
/* 010 */ private long range_nextIndex_0;
/* 011 */ private TaskContext range_taskContext_0;
/* 012 */ private InputMetrics range_inputMetrics_0;
/* 013 */ private long range_batchEnd_0;
/* 014 */ private long range_numElementsTodo_0;
/* 015 */ private InternalRow[] bnlj_buildRowArray_0;
/* 016 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */ public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */ this.references = references;
/* 020 */ }
/* 021 */
/* 022 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */ partitionIndex = index;
/* 024 */ this.inputs = inputs;
/* 025 */
/* 026 */ range_taskContext_0 = TaskContext.get();
/* 027 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */ bnlj_buildRowArray_0 = (InternalRow[]) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcastTerm */).value();
/* 032 */ range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 033 */
/* 034 */ }
/* 035 */
/* 036 */ private void bnlj_doConsume_0(long bnlj_expr_0_0) throws java.io.IOException {
/* 037 */ boolean bnlj_foundMatch_0 = false;
/* 038 */ for (int bnlj_arrayIndex_0 = 0; bnlj_arrayIndex_0 < bnlj_buildRowArray_0.length; bnlj_arrayIndex_0++) {
/* 039 */ UnsafeRow bnlj_buildRow_0 = (UnsafeRow) bnlj_buildRowArray_0[bnlj_arrayIndex_0];
/* 040 */ boolean bnlj_shouldOutputRow_0 = false;
/* 041 */
/* 042 */ boolean bnlj_isNull_2 = true;
/* 043 */ long bnlj_value_2 = -1L;
/* 044 */ if (bnlj_buildRow_0 != null) {
/* 045 */ long bnlj_value_1 = bnlj_buildRow_0.getLong(0);
/* 046 */ bnlj_isNull_2 = false;
/* 047 */ bnlj_value_2 = bnlj_value_1;
/* 048 */ }
/* 049 */
/* 050 */ long bnlj_value_4 = -1L;
/* 051 */
/* 052 */ bnlj_value_4 = bnlj_expr_0_0 + 1L;
/* 053 */
/* 054 */ boolean bnlj_value_3 = false;
/* 055 */ bnlj_value_3 = bnlj_value_4 <= bnlj_value_2;
/* 056 */ if (!(false || !bnlj_value_3))
/* 057 */ {
/* 058 */ bnlj_shouldOutputRow_0 = true;
/* 059 */ bnlj_foundMatch_0 = true;
/* 060 */ }
/* 061 */ if (bnlj_arrayIndex_0 == bnlj_buildRowArray_0.length - 1 && !bnlj_foundMatch_0) {
/* 062 */ bnlj_buildRow_0 = null;
/* 063 */ bnlj_shouldOutputRow_0 = true;
/* 064 */ }
/* 065 */ if (bnlj_shouldOutputRow_0) {
/* 066 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 067 */
/* 068 */ boolean bnlj_isNull_9 = true;
/* 069 */ long bnlj_value_9 = -1L;
/* 070 */ if (bnlj_buildRow_0 != null) {
/* 071 */ long bnlj_value_8 = bnlj_buildRow_0.getLong(0);
/* 072 */ bnlj_isNull_9 = false;
/* 073 */ bnlj_value_9 = bnlj_value_8;
/* 074 */ }
/* 075 */ range_mutableStateArray_0[3].reset();
/* 076 */
/* 077 */ range_mutableStateArray_0[3].zeroOutNullBytes();
/* 078 */
/* 079 */ range_mutableStateArray_0[3].write(0, bnlj_expr_0_0);
/* 080 */
/* 081 */ if (bnlj_isNull_9) {
/* 082 */ range_mutableStateArray_0[3].setNullAt(1);
/* 083 */ } else {
/* 084 */ range_mutableStateArray_0[3].write(1, bnlj_value_9);
/* 085 */ }
/* 086 */ append((range_mutableStateArray_0[3].getRow()).copy());
/* 087 */
/* 088 */ }
/* 089 */ }
/* 090 */
/* 091 */ }
/* 092 */
/* 093 */ private void initRange(int idx) {
/* 094 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 095 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(16L);
/* 096 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(4L);
/* 097 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 098 */ java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 099 */ long partitionEnd;
/* 100 */
/* 101 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 102 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 103 */ range_nextIndex_0 = Long.MAX_VALUE;
/* 104 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 105 */ range_nextIndex_0 = Long.MIN_VALUE;
/* 106 */ } else {
/* 107 */ range_nextIndex_0 = st.longValue();
/* 108 */ }
/* 109 */ range_batchEnd_0 = range_nextIndex_0;
/* 110 */
/* 111 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 112 */ .multiply(step).add(start);
/* 113 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 114 */ partitionEnd = Long.MAX_VALUE;
/* 115 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 116 */ partitionEnd = Long.MIN_VALUE;
/* 117 */ } else {
/* 118 */ partitionEnd = end.longValue();
/* 119 */ }
/* 120 */
/* 121 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 122 */ java.math.BigInteger.valueOf(range_nextIndex_0));
/* 123 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue();
/* 124 */ if (range_numElementsTodo_0 < 0) {
/* 125 */ range_numElementsTodo_0 = 0;
/* 126 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 127 */ range_numElementsTodo_0++;
/* 128 */ }
/* 129 */ }
/* 130 */
/* 131 */ protected void processNext() throws java.io.IOException {
/* 132 */ // initialize Range
/* 133 */ if (!range_initRange_0) {
/* 134 */ range_initRange_0 = true;
/* 135 */ initRange(partitionIndex);
/* 136 */ }
/* 137 */
/* 138 */ while (true) {
/* 139 */ if (range_nextIndex_0 == range_batchEnd_0) {
/* 140 */ long range_nextBatchTodo_0;
/* 141 */ if (range_numElementsTodo_0 > 1000L) {
/* 142 */ range_nextBatchTodo_0 = 1000L;
/* 143 */ range_numElementsTodo_0 -= 1000L;
/* 144 */ } else {
/* 145 */ range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 146 */ range_numElementsTodo_0 = 0;
/* 147 */ if (range_nextBatchTodo_0 == 0) break;
/* 148 */ }
/* 149 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 150 */ }
/* 151 */
/* 152 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 153 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 154 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 155 */
/* 156 */ // common sub-expressions
/* 157 */
/* 158 */ bnlj_doConsume_0(range_value_0);
/* 159 */
/* 160 */ if (shouldStop()) {
/* 161 */ range_nextIndex_0 = range_value_0 + 1L;
/* 162 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 163 */ range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 164 */ return;
/* 165 */ }
/* 166 */
/* 167 */ }
/* 168 */ range_nextIndex_0 = range_batchEnd_0;
/* 169 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 170 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 171 */ range_taskContext_0.killTaskIfInterrupted();
/* 172 */ }
/* 173 */ }
/* 174 */
/* 175 */ }
```
Closes apache#31931 from linzebing/code-left-right-outer.
Authored-by: linzebing <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sorry about the API break. I don't expect it to happen too many more times