From cdebdf5d79cb6c3c18b9dfed7ded5c896556b21c Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 24 Mar 2015 01:04:46 -0700 Subject: [PATCH 1/2] Use completion iterator to close external sorter --- .../org/apache/spark/sql/execution/basicOperators.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 20c9bc3e7554..cabad95b93ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.util.MutablePair +import org.apache.spark.util.{CompletionIterator, MutablePair} import org.apache.spark.util.collection.ExternalSorter /** @@ -194,7 +194,8 @@ case class ExternalSort( val ordering = newOrdering(sortOrder, child.output) val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering)) sorter.insertAll(iterator.map(r => (r, null))) - sorter.iterator.map(_._1) + val baseIterator = sorter.iterator.map(_._1) + CompletionIterator(baseIterator, sorter.stop()) }, preservesPartitioning = true) } From cb13d3c0e880ecef1812e539e37d7ea086b5dc28 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 24 Mar 2015 01:11:25 -0700 Subject: [PATCH 2/2] hint to inferencer --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index cabad95b93ee..1f5251a20376 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -195,7 +195,8 @@ case class ExternalSort( val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering)) sorter.insertAll(iterator.map(r => (r, null))) val baseIterator = sorter.iterator.map(_._1) - CompletionIterator(baseIterator, sorter.stop()) + // TODO(marmbrus): The complex type signature below thwarts inference for no reason. + CompletionIterator[Row, Iterator[Row]](baseIterator, sorter.stop()) }, preservesPartitioning = true) }