From b8acae2d35c342a117222a3a0cc111f31bd4b4c4 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 20 Jul 2017 13:25:16 -0700 Subject: [PATCH 1/2] fix memory leak on SortMergeJoin --- .../execution/joins/SortMergeJoinExec.scala | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 639b8e00c121..a9ee97390d41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -179,6 +179,7 @@ case class SortMergeJoinExec( currentRightMatches = null currentLeftRow = null rightMatchesIterator = null + smjScanner.destruct() return false } } @@ -188,6 +189,7 @@ case class SortMergeJoinExec( return true } } + smjScanner.destruct() false } @@ -266,6 +268,7 @@ case class SortMergeJoinExec( } } } + smjScanner.destruct() false } @@ -306,6 +309,7 @@ case class SortMergeJoinExec( return true } } + smjScanner.destruct() false } @@ -344,6 +348,7 @@ case class SortMergeJoinExec( numOutputRows += 1 return true } + smjScanner.destruct() false } @@ -604,6 +609,12 @@ case class SortMergeJoinExec( | } | if (shouldStop()) return; |} + |while ($leftInput.hasNext()) { + | $leftInput.next(); + |} + while ($rightInput.hasNext()) { + | $rightInput.next(); + |} """.stripMargin } } @@ -649,6 +660,11 @@ private[joins] class SortMergeJoinScanner( // Initialization (note: do _not_ want to advance streamed here). advancedBufferedToRowWithNullFreeJoinKey() + def destruct(): Unit = { + while (streamedIter.advanceNext()) {} + while (bufferedIter.advanceNext()) {} + } + // --- Public methods --------------------------------------------------------------------------- def getStreamedRow: InternalRow = streamedRow @@ -915,7 +931,11 @@ private abstract class OneSideOuterIterator( override def advanceNext(): Boolean = { val r = advanceBufferUntilBoundConditionSatisfied() || advanceStream() - if (r) numOutputRows += 1 + if (r) { + numOutputRows += 1 + } else { + smjScanner.destruct() + } r } @@ -947,6 +967,10 @@ private class SortMergeFullOuterJoinScanner( advancedLeft() advancedRight() + def destruct(): Unit = { + while (leftIter.advanceNext()) {} + while(rightIter.advanceNext()) {} + } // --- Private methods -------------------------------------------------------------------------- /** @@ -1103,7 +1127,11 @@ private class FullOuterIterator( override def advanceNext(): Boolean = { val r = smjScanner.advanceNext() - if (r) numRows += 1 + if (r) { + numRows += 1 + } else { + smjScanner.destruct(); + } r } From 2703c1fa262dda380232812b8a157d0538bdffce Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 20 Jul 2017 20:52:13 -0700 Subject: [PATCH 2/2] solve review comments --- .../apache/spark/sql/execution/joins/SortMergeJoinExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index a9ee97390d41..799b2fed81cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -969,7 +969,7 @@ private class SortMergeFullOuterJoinScanner( def destruct(): Unit = { while (leftIter.advanceNext()) {} - while(rightIter.advanceNext()) {} + while (rightIter.advanceNext()) {} } // --- Private methods --------------------------------------------------------------------------