Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ object IDFModel extends MLReadable[IDFModel] {
val data = sparkSession.read.parquet(dataPath)

val model = if (majorVersion(metadata.sparkVersion) >= 3) {
val Row(idf: Vector, df: Seq[_], numDocs: Long) = data.select("idf", "docFreq", "numDocs")
.head()
val Row(idf: Vector, df: scala.collection.Seq[_], numDocs: Long) =
data.select("idf", "docFreq", "numDocs").head()
new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf),
df.asInstanceOf[Seq[Long]].toArray, numDocs))
df.asInstanceOf[scala.collection.Seq[Long]].toArray, numDocs))
} else {
val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
.select("idf")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ object MinHashLSHModel extends MLReadable[MinHashLSHModel] {

val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath).select("randCoefficients").head()
val randCoefficients = data.getAs[Seq[Int]](0).grouped(2)
val randCoefficients = data.getSeq[Int](0).grouped(2)
.map(tuple => (tuple(0), tuple(1))).toArray
val model = new MinHashLSHModel(metadata.uid, randCoefficients)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ object RFormulaModel extends MLReadable[RFormulaModel] {
val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath).select("label", "terms", "hasIntercept").head()
val label = data.getString(0)
val terms = data.getAs[Seq[Seq[String]]](1)
val terms = data.getSeq[Seq[String]](1)
val hasIntercept = data.getBoolean(2)
val resolvedRFormula = ResolvedRFormula(label, terms, hasIntercept)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class StringIndexer @Since("1.4.0") (

val selectedCols = getSelectedCols(dataset, inputCols).map(collect_set(_))
val allLabels = dataset.select(selectedCols: _*)
.collect().toSeq.flatMap(_.toSeq).asInstanceOf[Seq[Seq[String]]]
.collect().toSeq.flatMap(_.toSeq)
.asInstanceOf[scala.collection.Seq[scala.collection.Seq[String]]].toSeq
ThreadUtils.parmap(allLabels, "sortingStringLabels", 8) { labels =>
val sorted = labels.filter(_ != null).sorted
if (ascending) {
Expand Down Expand Up @@ -522,7 +523,7 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] {
val data = sparkSession.read.parquet(dataPath)
.select("labelsArray")
.head()
data.getAs[Seq[Seq[String]]](0).map(_.toArray).toArray
data.getSeq[scala.collection.Seq[String]](0).map(_.toArray).toArray
}
val model = new StringIndexerModel(metadata.uid, labelsArray)
metadata.getAndSetParams(model)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ class VectorIndexerModel private[ml] (
/** Java-friendly version of [[categoryMaps]] */
@Since("1.4.0")
def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = {
categoryMaps.mapValues(_.asJava).asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]]
categoryMaps.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]]
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ final class Word2Vec @Since("1.4.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): Word2VecModel = {
transformSchema(dataset.schema, logging = true)
val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
val input =
dataset.select($(inputCol)).rdd.map(_.getSeq[String](0))
val wordVectors = new feature.Word2Vec()
.setLearningRate($(stepSize))
.setMinCount($(minCount))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
Map.empty[Any, Double]
} else {
frequentItems.rdd.flatMap {
case Row(items: Seq[_], count: Long) if items.length == 1 =>
case Row(items: scala.collection.Seq[_], count: Long) if items.length == 1 =>
Some(items.head -> count.toDouble / numTrainingRecords)
case _ => None
}.collectAsMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ final class PrefixSpan(@Since("2.4.0") override val uid: String) extends Params

val data = dataset.select(sequenceColParam)
val sequences = data.where(col(sequenceColParam).isNotNull).rdd
.map(r => r.getAs[Seq[Seq[Any]]](0).map(_.toArray).toArray)
.map(r => r.getSeq[scala.collection.Seq[Any]](0).map(_.toArray).toArray)

val mllibPrefixSpan = new mllibPrefixSpan()
.setMinSupport($(minSupport))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val data = dataArray(0)
val labels = data.getAs[Seq[Double]](0).toArray
val pi = data.getAs[Seq[Double]](1).toArray
val theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArray
val theta = data.getSeq[scala.collection.Seq[Double]](2).map(_.toArray).toArray
val modelType = data.getString(3)
new NaiveBayesModel(labels, pi, theta, modelType)
}
Expand Down Expand Up @@ -260,7 +260,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val data = dataArray(0)
val labels = data.getAs[Seq[Double]](0).toArray
val pi = data.getAs[Seq[Double]](1).toArray
val theta = data.getAs[Seq[Seq[Double]]](2).map(_.toArray).toArray
val theta = data.getSeq[scala.collection.Seq[Double]](2).map(_.toArray).toArray
new NaiveBayesModel(labels, pi, theta)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {

def loadImpl[Item: ClassTag](freqSequences: DataFrame, sample: Item): PrefixSpanModel[Item] = {
val freqSequencesRDD = freqSequences.select("sequence", "freq").rdd.map { x =>
val sequence = x.getAs[Seq[Seq[Item]]](0).map(_.toArray).toArray
val sequence = x.getSeq[scala.collection.Seq[Item]](0).map(_.toArray).toArray
val freq = x.getLong(1)
new PrefixSpan.FreqSequence(sequence, freq)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,12 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
assert(formatVersion == thisFormatVersion)
val rank = (metadata \ "rank").extract[Int]
val userFeatures = spark.read.parquet(userPath(path)).rdd.map {
case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
case Row(id: Int, features: scala.collection.Seq[_]) =>
(id, features.asInstanceOf[scala.collection.Seq[Double]].toArray)
}
val productFeatures = spark.read.parquet(productPath(path)).rdd.map {
case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
case Row(id: Int, features: scala.collection.Seq[_]) =>
(id, features.asInstanceOf[scala.collection.Seq[Double]].toArray)
}
new MatrixFactorizationModel(rank, userFeatures, productFeatures)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
}

def apply(r: Row): SplitData = {
SplitData(r.getInt(0), r.getDouble(1), r.getInt(2), r.getAs[Seq[Double]](3))
SplitData(r.getInt(0), r.getDouble(1), r.getInt(2), r.getSeq[Double](3))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
assert(topics.count() === k)
assert(topics.select("topic").rdd.map(_.getInt(0)).collect().toSet === Range(0, k).toSet)
topics.select("termIndices").collect().foreach { case r: Row =>
val termIndices = r.getAs[Seq[Int]](0)
val termIndices = r.getSeq[Int](0)
assert(termIndices.length === 3 && termIndices.toSet.size === 3)
}
topics.select("termWeights").collect().foreach { case r: Row =>
val termWeights = r.getAs[Seq[Double]](0)
val termWeights = r.getSeq[Double](0)
assert(termWeights.length === 3 && termWeights.forall(w => w >= 0.0 && w <= 1.0))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class BucketedRandomProjectionLSHSuite extends MLTest with DefaultReadWriteTest
val brpModel = brp.fit(dataset)

testTransformer[Tuple1[Vector]](dataset.toDF(), brpModel, "values") {
case Row(values: Seq[_]) =>
case Row(values: scala.collection.Seq[_]) =>
assert(values.length === brp.getNumHashTables)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ private[ml] object LSHTest {
transformedData.schema, model.getOutputCol, DataTypes.createArrayType(new VectorUDT))

// Check output column dimensions
val headHashValue = transformedData.select(outputCol).head().get(0).asInstanceOf[Seq[Vector]]
val headHashValue =
transformedData.select(outputCol).head().get(0).asInstanceOf[scala.collection.Seq[Vector]]
assert(headHashValue.length == model.getNumHashTables)

// Perform a cross join and label each pair of same_bucket and distance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class MinHashLSHSuite extends MLTest with DefaultReadWriteTest {
val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
model.set(model.inputCol, "keys")
testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", model.getOutputCol) {
case Row(_: Vector, output: Seq[_]) =>
case Row(_: Vector, output: scala.collection.Seq[_]) =>
assert(output.length === model.randCoefficients.length)
// no AND-amplification yet: SPARK-18450, so each hash output is of length 1
output.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest {

def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") {
case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
case Row(actualNGrams : scala.collection.Seq[_], wantedNGrams: scala.collection.Seq[_]) =>
assert(actualNGrams === wantedNGrams)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StopWordsRemoverSuite extends MLTest with DefaultReadWriteTest {

def testStopWordsRemover(t: StopWordsRemover, dataFrame: DataFrame): Unit = {
testTransformer[(Array[String], Array[String])](dataFrame, t, "filtered", "expected") {
case Row(tokens: Seq[_], wantedTokens: Seq[_]) =>
case Row(tokens: scala.collection.Seq[_], wantedTokens: scala.collection.Seq[_]) =>
assert(tokens === wantedTokens)
}
}
Expand Down Expand Up @@ -242,7 +242,8 @@ class StopWordsRemoverSuite extends MLTest with DefaultReadWriteTest {
remover.transform(df)
.select("filtered1", "expected1", "filtered2", "expected2")
.collect().foreach {
case Row(r1: Seq[_], e1: Seq[_], r2: Seq[_], e2: Seq[_]) =>
case Row(r1: scala.collection.Seq[_], e1: scala.collection.Seq[_],
r2: scala.collection.Seq[_], e2: scala.collection.Seq[_]) =>
assert(r1 === e1,
s"The result value is not correct after bucketing. Expected $e1 but found $r1")
assert(r2 === e2,
Expand All @@ -268,7 +269,8 @@ class StopWordsRemoverSuite extends MLTest with DefaultReadWriteTest {
remover.transform(df)
.select("filtered1", "expected1", "filtered2", "expected2")
.collect().foreach {
case Row(r1: Seq[_], e1: Seq[_], r2: Seq[_], e2: Seq[_]) =>
case Row(r1: scala.collection.Seq[_], e1: scala.collection.Seq[_],
r2: scala.collection.Seq[_], e2: scala.collection.Seq[_]) =>
assert(r1 === e1,
s"The result value is not correct after bucketing. Expected $e1 but found $r1")
assert(r2 === e2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul

val prediction = model.transform(
spark.createDataFrame(Seq(Tuple1(Array("1", "2")))).toDF("items")
).first().getAs[Seq[String]]("prediction")
).first().getAs[scala.collection.Seq[String]]("prediction")

assert(prediction === Seq("3"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class RandomForestRegressorSuite extends MLTest with DefaultReadWriteTest{
val testParams = Seq(
(50, 5, 1.0, 0.75),
(50, 10, 1.0, 0.75),
(50, 10, 0.95, 0.78)
(50, 10, 0.95, 0.75)
)

for ((numTrees, maxDepth, subsamplingRate, tol) <- testParams) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MLTestSuite extends MLTest {
}
intercept[Exception] {
testTransformerOnStreamData[(Int, String)](data, indexerModel, "id", "indexed") {
rows: Seq[Row] =>
rows: scala.collection.Seq[Row] =>
assert(rows.map(_.getDouble(1)).max === 1.0)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
// and a Word2VecMap give the same values.
val word2VecMap = model.getVectors
val newModel = new Word2VecModel(word2VecMap)
assert(newModel.getVectors.mapValues(_.toSeq) === word2VecMap.mapValues(_.toSeq))
assert(newModel.getVectors.mapValues(_.toSeq).toMap ===
word2VecMap.mapValues(_.toSeq).toMap)
}

test("Word2Vec throws exception when vocabulary is empty") {
Expand Down Expand Up @@ -102,7 +103,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
try {
model.save(sc, path)
val sameModel = Word2VecModel.load(sc, path)
assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq))
assert(sameModel.getVectors.mapValues(_.toSeq).toMap ===
model.getVectors.mapValues(_.toSeq).toMap)
} finally {
Utils.deleteRecursively(tempDir)
}
Expand Down Expand Up @@ -136,7 +138,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext {
try {
model.save(sc, path)
val sameModel = Word2VecModel.load(sc, path)
assert(sameModel.getVectors.mapValues(_.toSeq) === model.getVectors.mapValues(_.toSeq))
assert(sameModel.getVectors.mapValues(_.toSeq).toMap ===
model.getVectors.mapValues(_.toSeq).toMap)
}
catch {
case t: Throwable => fail("exception thrown persisting a model " +
Expand Down