Skip to content

Commit 7fa7a2c

Browse files
committed
[SPARK-6309] [SQL] [MLlib] Implement MatrixUDT
1 parent 45f4c66 commit 7fa7a2c

File tree

2 files changed

+106
-0
lines changed

2 files changed

+106
-0
lines changed

mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,17 @@ import java.util.{Arrays, Random}
2121

2222
import scala.collection.mutable.{ArrayBuilder => MArrayBuilder, HashSet => MHashSet, ArrayBuffer}
2323

24+
import org.apache.spark.annotation.DeveloperApi
25+
import org.apache.spark.sql.Row
26+
import org.apache.spark.sql.types._
27+
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
28+
2429
import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}
2530

2631
/**
2732
* Trait for a local matrix.
2833
*/
34+
@SQLUserDefinedType(udt = classOf[MatrixUDT])
2935
sealed trait Matrix extends Serializable {
3036

3137
/** Number of rows. */
@@ -102,6 +108,86 @@ sealed trait Matrix extends Serializable {
102108
private[spark] def foreachActive(f: (Int, Int, Double) => Unit)
103109
}
104110

111+
@DeveloperApi
112+
private[spark] class MatrixUDT extends UserDefinedType[Matrix] {
113+
114+
override def sqlType: StructType = {
115+
// type: 0 = sparse, 1 = dense
116+
// the dense matrix is built by numRows, numCols, values and isTransposed, all of which are
117+
// set as not nullable, except values since in the future, support for binary matrices might
118+
// be added for which values are not needed.
119+
// the sparse matrix needs colPtrs and rowIndices, which are set as
120+
// null, while building the dense matrix.
121+
StructType(Seq(
122+
StructField("type", ByteType, nullable = false),
123+
StructField("numRows", IntegerType, nullable = false),
124+
StructField("numCols", IntegerType, nullable = false),
125+
StructField("colPtrs", ArrayType(IntegerType, containsNull = false), nullable = true),
126+
StructField("rowIndices", ArrayType(IntegerType, containsNull = false), nullable = true),
127+
StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true),
128+
StructField("isTransposed", BooleanType, nullable = false)
129+
))
130+
}
131+
132+
override def serialize(obj: Any): Row = {
133+
val row = new GenericMutableRow(7)
134+
obj match {
135+
case sm: SparseMatrix =>
136+
row.setByte(0, 0)
137+
row.setInt(1, sm.numRows)
138+
row.setInt(2, sm.numCols)
139+
row.update(3, sm.colPtrs.toSeq)
140+
row.update(4, sm.rowIndices.toSeq)
141+
row.update(5, sm.values.toSeq)
142+
row.setBoolean(6, sm.isTransposed)
143+
144+
case dm: DenseMatrix =>
145+
row.setByte(0, 1)
146+
row.setInt(1, dm.numRows)
147+
row.setInt(2, dm.numCols)
148+
row.setNullAt(3)
149+
row.setNullAt(4)
150+
row.update(5, dm.values.toSeq)
151+
row.setBoolean(6, dm.isTransposed)
152+
}
153+
row
154+
}
155+
156+
override def deserialize(datum: Any): Matrix = {
157+
datum match {
158+
// TODO: something wrong with UDT serialization, should never happen.
159+
case m: Matrix => m
160+
case row: Row =>
161+
require(row.length == 7,
162+
s"MatrixUDT.deserialize given row with length ${row.length} but requires length == 7")
163+
val tpe = row.getByte(0)
164+
val numRows = row.getInt(1)
165+
val numCols = row.getInt(2)
166+
val values = row.getAs[Iterable[Double]](5).toArray
167+
val isTransposed = row.getBoolean(6)
168+
tpe match {
169+
case 0 =>
170+
val colPtrs = row.getAs[Iterable[Int]](3).toArray
171+
val rowIndices = row.getAs[Iterable[Int]](4).toArray
172+
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values, isTransposed)
173+
case 1 =>
174+
new DenseMatrix(numRows, numCols, values, isTransposed)
175+
}
176+
}
177+
}
178+
179+
override def userClass: Class[Matrix] = classOf[Matrix]
180+
181+
override def equals(o: Any): Boolean = {
182+
o match {
183+
case v: MatrixUDT => true
184+
case _ => false
185+
}
186+
}
187+
188+
private[spark] override def asNullable: MatrixUDT = this
189+
}
190+
105191
/**
106192
* Column-major dense matrix.
107193
* The entry values are stored in a single array of doubles with columns listed in sequence.
@@ -119,6 +205,7 @@ sealed trait Matrix extends Serializable {
119205
* @param isTransposed whether the matrix is transposed. If true, `values` stores the matrix in
120206
* row major.
121207
*/
208+
@SQLUserDefinedType(udt = classOf[MatrixUDT])
122209
class DenseMatrix(
123210
val numRows: Int,
124211
val numCols: Int,
@@ -356,6 +443,7 @@ object DenseMatrix {
356443
* Compressed Sparse Row (CSR) format, where `colPtrs` behaves as rowPtrs,
357444
* and `rowIndices` behave as colIndices, and `values` are stored in row major.
358445
*/
446+
@SQLUserDefinedType(udt = classOf[MatrixUDT])
359447
class SparseMatrix(
360448
val numRows: Int,
361449
val numCols: Int,
@@ -431,6 +519,13 @@ class SparseMatrix(
431519
}
432520
}
433521

522+
override def equals(o: Any) = o match {
523+
case m: SparseMatrix =>
524+
(m.numRows == numRows && m.numCols == numCols && Arrays.equals(colPtrs, m.colPtrs)
525+
&& Arrays.equals(rowIndices, m.rowIndices) && Arrays.equals(values, m.values))
526+
case _ => false
527+
}
528+
434529
override def copy = new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.clone())
435530

436531
private[mllib] def map(f: Double => Double) =

mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,4 +424,15 @@ class MatricesSuite extends FunSuite {
424424
assert(mat.rowIndices.toSeq === Seq(3, 0, 2, 1))
425425
assert(mat.values.toSeq === Seq(1.0, 2.0, 3.0, 4.0))
426426
}
427+
428+
test("MatrixUDT") {
429+
val dm1 = new DenseMatrix(2, 2, Array(0.9, 1.2, 2.3, 9.8))
430+
val dm2 = new DenseMatrix(0, 0, Array())
431+
val sm1 = dm1.toSparse
432+
val sm2 = dm2.toSparse
433+
val mUDT = new MatrixUDT()
434+
Seq(dm1, dm2, sm1, sm2).foreach {
435+
mat => assert(mat === mUDT.deserialize(mUDT.serialize(mat)))
436+
}
437+
}
427438
}

0 commit comments

Comments
 (0)