Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {

override def toSeq: Seq[Any] = values.toSeq

override def copy(): Row = this
override def copy(): Row = new GenericRow(values.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GenericRow is supposed to be immutable. So there we really shouldn't need to copy its values. Could you explain why this is needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is immutable, but if you want to extract the values of a GenericRows, change them and create a new row, then it's better to not change the original values.
So you would first want to receive a copy of the original row.

Also the method name copy() implies that it returns a real copy and not the reference on the original object.

Other rows like GenericInternalRow implement a correct copy method:
override def copy(): InternalRow = new GenericInternalRow(values.clone())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot a put value back in the GenericRow when you have taken it out, unless it is a mutable object. Could you provide an example for this? Which is also a good basis for a unit test.

copy is merely a contract all rows need to adhere to. If a row is inmutable, why copy it? It also avoids a lot object allocations. I think GenericInternalRow should also return reference to it self on copy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to provide a simple example and I could not try the code below right now. Imagine you want to query a row, copy it, change a value and then build a new one. Then you want to build a DF out of the new and the old row. I know that this is a strange example and there are a lot of better method to implement it, but you could implement it this way:

import org.apache.spark.sql.Row;
val df = sc.parallelize(1,2,3,5,6,7,8,9).toDF.sort
val firstRow= df.first
val firstRowCopied = firstRow.copy()  // <-- HERE the row will not be copied. 
val arr = firstRowCopied.toSeq.toArray 
arr(0) =  arr(0) * 10
val newRow = Row.fromSeq(arr)
sqlContext.createDataframe(List(firstRow, newRow), df.schema)  // Both row will contain the value 10 and share the same value Sequence

If you don't want that someone uses the copy method, because it does not do what it implies, then i think we should provide another trait/contract which does not include this method. It's just confusing if the copy method does not copy the object.
row.copy() and just row are identical so far!

I don't see the point, why we should'nt provide this copy method for the users who want to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been doing some digging. Using an adapted version of your code (only made it work):

import org.apache.spark.sql.Row;
val df = sqlContext.range(1, 9).toDF
val firstRow = df.first
val firstRowCopied = firstRow.copy()  // <-- HERE the row will not be copied.
val arr = firstRowCopied.toSeq.toArray
arr(0) =  arr(0).asInstanceOf[Long] * 10
val newRow = Row.fromSeq(arr)
val newDf = sqlContext.createDataFrame(sc.parallelize(Seq(firstRow, newRow)), df.schema)

newDF.show yields the following result:

+---+
| id|
+---+
| 10|
| 10|
+---+

Which is wrong. What happens is that the firstRowCopied.toSeq wraps the value array in a ArrayOps object, this object returns wrapped backing array (instead of a copy) when the you invoke toArray. This really shouldn't happen, because now you mutable gain access to a structure which is supposed to be immutable. I think we should change the toSeq method instead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's the best solution. Maybe we need some help from more experienced committers here. I think we are agreed that we need a change here, but we differ how the change should look like.

Anyway, it's not urgent, because the wrong behave should only affect just a few developers/users.

}

class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
Expand Down
16 changes: 16 additions & 0 deletions sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,20 @@ class RowTest extends FunSpec with Matchers {
internalRow shouldEqual internalRow2
}
}

describe("row copy") {
val noSchemaRowCopy = noSchemaRow.copy()
it("equality check for copied rows") {
noSchemaRowCopy shouldEqual noSchemaRow
}

val noSchemaRowCopySec = noSchemaRow.copy()
val array = noSchemaRowCopySec.toSeq.toArray
array(0) = "value3"
val newRow = Row.fromSeq(array)
it("check mutating a copied row's internal array does not affect the original row") {
newRow.getAs[String](0) shouldBe "value3"
noSchemaRow.getAs[String](0) shouldBe "value1"
}
}
}