Skip to content

Commit 0b8cd70

Browse files
committed
Adds Scala/Catalyst row conversion when writing non-partitioned tables
1 parent fa543f3 commit 0b8cd70

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import parquet.hadoop.util.ContextUtil
3030
import org.apache.spark._
3131
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3232
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
33+
import org.apache.spark.sql.catalyst.CatalystTypeConverters
3334
import org.apache.spark.sql.catalyst.expressions._
3435
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3536
import org.apache.spark.sql.execution.RunnableCommand
@@ -123,9 +124,17 @@ private[sql] case class InsertIntoFSBasedRelation(
123124
writerContainer.executorSideSetup(taskContext)
124125

125126
try {
126-
while (iterator.hasNext) {
127-
val row = iterator.next()
128-
writerContainer.outputWriterForRow(row).write(row)
127+
if (relation.needConversion) {
128+
val converter = CatalystTypeConverters.createToScalaConverter(relation.dataSchema)
129+
while (iterator.hasNext) {
130+
val row = converter(iterator.next()).asInstanceOf[Row]
131+
writerContainer.outputWriterForRow(row).write(row)
132+
}
133+
} else {
134+
while (iterator.hasNext) {
135+
val row = iterator.next()
136+
writerContainer.outputWriterForRow(row).write(row)
137+
}
129138
}
130139
writerContainer.commitTask()
131140
} catch { case cause: Throwable =>

0 commit comments

Comments
 (0)