Skip to content

Commit 9869ec2

Browse files
committed
Clean up Exchange code a bit
1 parent 82bb0ec commit 9869ec2

File tree

2 files changed

+17
-12
lines changed

2 files changed

+17
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20+
import org.apache.spark.sql.catalyst.util.DateUtils
21+
import org.apache.spark.sql.catalyst.InternalRow
2022
import org.apache.spark.sql.catalyst.util.ObjectPool
2123
import org.apache.spark.sql.types._
2224
import org.apache.spark.unsafe.PlatformDependent
@@ -101,6 +103,19 @@ class UnsafeRowConverter(fieldTypes: Array[DataType]) {
101103

102104
}
103105

106+
object UnsafeRowConverter {
107+
def supportsSchema(schema: StructType): Boolean = {
108+
schema.forall { field =>
109+
try {
110+
UnsafeColumnWriter.forType(field.dataType)
111+
true
112+
} catch {
113+
case e: UnsupportedOperationException => false
114+
}
115+
}
116+
}
117+
}
118+
104119
/**
105120
* Function for writing a column into an UnsafeRow.
106121
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import scala.util.control.NonFatal
21-
2220
import org.apache.spark.annotation.DeveloperApi
2321
import org.apache.spark.rdd.{RDD, ShuffledRDD}
2422
import org.apache.spark.serializer.Serializer
@@ -304,16 +302,8 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
304302
}
305303

306304
val withSort = if (needSort) {
307-
// TODO(josh): this is a hack. Need a better way to determine whether UnsafeRow
308-
// supports the given schema.
309-
val supportsUnsafeRowConversion: Boolean = try {
310-
new UnsafeRowConverter(withShuffle.schema.map(_.dataType).toArray)
311-
true
312-
} catch {
313-
case NonFatal(e) =>
314-
false
315-
}
316-
if (sqlContext.conf.unsafeEnabled && supportsUnsafeRowConversion) {
305+
if (sqlContext.conf.unsafeEnabled
306+
&& UnsafeRowConverter.supportsSchema(withShuffle.schema)) {
317307
UnsafeExternalSort(rowOrdering, global = false, withShuffle)
318308
} else if (sqlContext.conf.externalSortEnabled) {
319309
ExternalSort(rowOrdering, global = false, withShuffle)

0 commit comments

Comments
 (0)