diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index b6909b3386b7..bfbede1057a3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -19,7 +19,6 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path -import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} @@ -34,14 +33,15 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils, VersionUtils} /** - * Params for [[Word2Vec]] and [[Word2VecModel]]. - */ + * Params for [[Word2Vec]] and [[Word2VecModel]]. + */ private[feature] trait Word2VecBase extends Params with HasInputCol with HasOutputCol with HasMaxIter with HasStepSize with HasSeed { /** * The dimension of the code that you want to transform from words. * Default: 100 + * * @group param */ final val vectorSize = new IntParam( @@ -55,6 +55,7 @@ private[feature] trait Word2VecBase extends Params /** * The window size (context words from [-window, window]). * Default: 5 + * * @group expertParam */ final val windowSize = new IntParam( @@ -68,6 +69,7 @@ private[feature] trait Word2VecBase extends Params /** * Number of partitions for sentences of words. * Default: 1 + * * @group param */ final val numPartitions = new IntParam( @@ -78,10 +80,19 @@ private[feature] trait Word2VecBase extends Params /** @group getParam */ def getNumPartitions: Int = $(numPartitions) + final val supplementaryWords = new StringArrayParam(this, + "supplementaryWordsCol", "additional training data for unsupervised vector learning") + + setDefault(supplementaryWords -> Array[String]()) + + /** @group getParam */ + def getSupplementaryWords: Array[String] = $(supplementaryWords) + /** * The minimum number of times a token must appear to be included in the word2vec model's * vocabulary. * Default: 5 + * * @group param */ final val minCount = new IntParam(this, "minCount", "the minimum number of times a token must " + @@ -96,6 +107,7 @@ private[feature] trait Word2VecBase extends Params * Any sentence longer than this threshold will be divided into chunks of * up to `maxSentenceLength` size. * Default: 1000 + * * @group param */ final val maxSentenceLength = new IntParam(this, "maxSentenceLength", "Maximum length " + @@ -120,12 +132,12 @@ private[feature] trait Word2VecBase extends Params } /** - * Word2Vec trains a model of `Map(String, Vector)`, i.e. transforms a word into a code for further - * natural language processing or machine learning process. - */ + * Word2Vec trains a model of `Map(String, Vector)`, i.e. transforms a word into a code for further + * natural language processing or machine learning process. + */ @Since("1.4.0") -final class Word2Vec @Since("1.4.0") ( - @Since("1.4.0") override val uid: String) +final class Word2Vec @Since("1.4.0")( + @Since("1.4.0") override val uid: String) extends Estimator[Word2VecModel] with Word2VecBase with DefaultParamsWritable { @Since("1.4.0") @@ -155,6 +167,10 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.0") def setNumPartitions(value: Int): this.type = set(numPartitions, value) + /** @group setParam */ + @Since("2.3.0") + def setSupplementaryWords(value: Array[String]): this.type = set(supplementaryWords, value) + /** @group setParam */ @Since("1.4.0") def setMaxIter(value: Int): this.type = set(maxIter, value) @@ -175,6 +191,7 @@ final class Word2Vec @Since("1.4.0") ( override def fit(dataset: Dataset[_]): Word2VecModel = { transformSchema(dataset.schema, logging = true) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) + .zipWithIndex().map(f => if (f._2 == 0) f._1 ++ $(supplementaryWords) else f._1) val wordVectors = new feature.Word2Vec() .setLearningRate($(stepSize)) .setMinCount($(minCount)) @@ -208,9 +225,9 @@ object Word2Vec extends DefaultParamsReadable[Word2Vec] { * Model fitted by [[Word2Vec]]. */ @Since("1.4.0") -class Word2VecModel private[ml] ( - @Since("1.4.0") override val uid: String, - @transient private val wordVectors: feature.Word2VecModel) +class Word2VecModel private[ml]( + @Since("1.4.0") override val uid: String, + @transient private val wordVectors: feature.Word2VecModel) extends Model[Word2VecModel] with Word2VecBase with MLWritable { import Word2VecModel._ @@ -229,8 +246,9 @@ class Word2VecModel private[ml] ( /** * Find "num" number of words closest in similarity to the given word, not * including the word itself. + * * @return a dataframe with columns "word" and "similarity" of the word and the cosine - * similarities between the synonyms and the given word vector. + * similarities between the synonyms and the given word vector. */ @Since("1.5.0") def findSynonyms(word: String, num: Int): DataFrame = { @@ -242,8 +260,9 @@ class Word2VecModel private[ml] ( * Find "num" number of words whose vector representation is most similar to the supplied vector. * If the supplied vector is the vector representation of a word in the model's vocabulary, * that word will be in the results. + * * @return a dataframe with columns "word" and "similarity" of the word and the cosine - * similarities between the synonyms and the given word vector. + * similarities between the synonyms and the given word vector. */ @Since("2.0.0") def findSynonyms(vec: Vector, num: Int): DataFrame = { @@ -255,8 +274,9 @@ class Word2VecModel private[ml] ( * Find "num" number of words whose vector representation is most similar to the supplied vector. * If the supplied vector is the vector representation of a word in the model's vocabulary, * that word will be in the results. + * * @return an array of the words and the cosine similarities between the synonyms given - * word vector. + * word vector. */ @Since("2.2.0") def findSynonymsArray(vec: Vector, num: Int): Array[(String, Double)] = { @@ -266,8 +286,9 @@ class Word2VecModel private[ml] ( /** * Find "num" number of words closest in similarity to the given word, not * including the word itself. + * * @return an array of the words and the cosine similarities between the synonyms given - * word vector. + * word vector. */ @Since("2.2.0") def findSynonymsArray(word: String, num: Int): Array[(String, Double)] = { @@ -357,15 +378,16 @@ object Word2VecModel extends MLReadable[Word2VecModel] { * Calculate the number of partitions to use in saving the model. * [SPARK-11994] - We want to partition the model in partitions smaller than * spark.kryoserializer.buffer.max - * @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max - * @param numWords Vocab size - * @param vectorSize Vector length for each word + * + * @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max + * @param numWords Vocab size + * @param vectorSize Vector length for each word */ def calculateNumberOfPartitions( - bufferSizeInBytes: Long, - numWords: Int, - vectorSize: Int): Int = { - val floatSize = 4L // Use Long to help avoid overflow + bufferSizeInBytes: Long, + numWords: Int, + vectorSize: Int): Int = { + val floatSize = 4L // Use Long to help avoid overflow val averageWordSize = 15 // Calculate the approximate size of the model. // Assuming an average word size of 15 bytes, the formula is: @@ -418,4 +440,4 @@ object Word2VecModel extends MLReadable[Word2VecModel] { @Since("1.6.0") override def load(path: String): Word2VecModel = super.load(path) -} +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala index 6183606a7b2a..b590a1e22359 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala @@ -76,6 +76,50 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } } + test("additionalData") { + + val spark = this.spark + import spark.implicits._ + + val sentence = "a b " * 100 + "a c " * 10 + val additionalData = "a b c a c" * 100 + val numOfWords = (sentence + additionalData).split(" ").length + val doc = sc.parallelize(Seq(sentence, sentence)) + .map(line => line.split(" ")) + + val codes = Map( + "a" -> Array(-0.2811822295188904, -0.6356269121170044, -0.3020961284637451), + "b" -> Array(1.0309048891067505, -1.29472815990448, 0.22276712954044342), + "c" -> Array(-0.08456747233867645, 0.5137411952018738, 0.11731560528278351) + ) + + val expected = doc.map { sentence => + Vectors.dense(sentence.map(codes.apply).reduce((word1, word2) => + word1.zip(word2).map { case (v1, v2) => v1 + v2 } + ).map(_ / numOfWords)) + } + + val docDF = doc.zip(expected).toDF("text", "expected") + + val w2v = new Word2Vec() + .setVectorSize(3) + .setInputCol("text") + .setSupplementaryWords(additionalData.split(" ")) + .setOutputCol("result") + .setSeed(42L) + val model = w2v.fit(docDF) + + MLTestingUtils.checkCopyAndUids(w2v, model) + + // These expectations are just magic values, characterizing the current + // behavior. The test needs to be updated to be more general, see SPARK-11502 + val magicExp = Vectors.dense(0.6622298123653639, -0.4716927178881385, 0.1346504972739653) + model.transform(docDF).select("result", "expected").collect().foreach { + case Row(vector1: Vector, vector2: Vector) => + assert(vector1 ~== magicExp absTol 1E-5, "Transformed vector is different with expected.") + } + } + test("getVectors") { val spark = this.spark