@@ -132,28 +132,32 @@ private[sql] case class InsertIntoFSBasedRelation(
132132 df : DataFrame ,
133133 partitionColumns : Array [String ]): Unit = {
134134
135+ require(
136+ df.schema == relation.schema,
137+ s """ DataFrame must have the same schema as the relation to which is inserted.
138+ |DataFrame schema: ${df.schema}
139+ |Relation schema: ${relation.schema}
140+ """ .stripMargin)
141+
135142 val sqlContext = df.sqlContext
136143
137144 val (partitionRDD, dataRDD) = {
138145 val fieldNames = relation.schema.fieldNames
139- val (partitionCols, dataCols) = fieldNames.partition (partitionColumns.contains)
146+ val dataCols = fieldNames.filterNot (partitionColumns.contains)
140147 val df = sqlContext.createDataFrame(
141148 DataFrame (sqlContext, query).queryExecution.toRdd,
142149 relation.schema,
143150 needsConversion = false )
144151
145- assert(
146- partitionCols.sameElements(partitionColumns), {
147- val insertionPartitionCols = partitionColumns.mkString(" ," )
148- val relationPartitionCols =
149- relation.partitionSpec.partitionColumns.fieldNames.mkString(" ," )
150- s """ Partition columns mismatch.
151- |Expected: $relationPartitionCols
152- |Actual: $insertionPartitionCols
153- """ .stripMargin
154- })
155-
156- val partitionDF = df.select(partitionCols.head, partitionCols.tail: _* )
152+ val partitionColumnsInSpec = relation.partitionSpec.partitionColumns.map(_.name)
153+ require(
154+ partitionColumnsInSpec.sameElements(partitionColumns),
155+ s """ Partition columns mismatch.
156+ |Expected: ${partitionColumnsInSpec.mkString(" , " )}
157+ |Actual: ${partitionColumns.mkString(" , " )}
158+ """ .stripMargin)
159+
160+ val partitionDF = df.select(partitionColumns.head, partitionColumns.tail: _* )
157161 val dataDF = df.select(dataCols.head, dataCols.tail: _* )
158162
159163 (partitionDF.rdd, dataDF.rdd)
0 commit comments