From 7473d6f81b9db4f1c4afbcfd1fd3b76dda8ed579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leonard=20H=C3=B6velmann?= Date: Fri, 14 Jul 2017 14:10:31 +0200 Subject: [PATCH 1/2] added support word2vec training with additional data --- .../apache/spark/ml/feature/Word2Vec.scala | 178 ++++++++++-------- .../spark/ml/feature/Word2VecSuite.scala | 44 +++++ 2 files changed, 144 insertions(+), 78 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index b6909b3386b7..4aa2799bd5bc 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -19,7 +19,6 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path -import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} @@ -34,16 +33,17 @@ import org.apache.spark.sql.types._ import org.apache.spark.util.{Utils, VersionUtils} /** - * Params for [[Word2Vec]] and [[Word2VecModel]]. - */ + * Params for [[Word2Vec]] and [[Word2VecModel]]. + */ private[feature] trait Word2VecBase extends Params with HasInputCol with HasOutputCol with HasMaxIter with HasStepSize with HasSeed { /** - * The dimension of the code that you want to transform from words. - * Default: 100 - * @group param - */ + * The dimension of the code that you want to transform from words. + * Default: 100 + * + * @group param + */ final val vectorSize = new IntParam( this, "vectorSize", "the dimension of codes after transforming from words (> 0)", ParamValidators.gt(0)) @@ -53,10 +53,11 @@ private[feature] trait Word2VecBase extends Params def getVectorSize: Int = $(vectorSize) /** - * The window size (context words from [-window, window]). - * Default: 5 - * @group expertParam - */ + * The window size (context words from [-window, window]). + * Default: 5 + * + * @group expertParam + */ final val windowSize = new IntParam( this, "windowSize", "the window size (context words from [-window, window]) (> 0)", ParamValidators.gt(0)) @@ -66,10 +67,11 @@ private[feature] trait Word2VecBase extends Params def getWindowSize: Int = $(windowSize) /** - * Number of partitions for sentences of words. - * Default: 1 - * @group param - */ + * Number of partitions for sentences of words. + * Default: 1 + * + * @group param + */ final val numPartitions = new IntParam( this, "numPartitions", "number of partitions for sentences of words (> 0)", ParamValidators.gt(0)) @@ -78,12 +80,21 @@ private[feature] trait Word2VecBase extends Params /** @group getParam */ def getNumPartitions: Int = $(numPartitions) + final val supplementaryWords = new StringArrayParam(this, + "supplementaryWordsCol", "additional training data for unsupervised vector learning") + + setDefault(supplementaryWords -> Array[String]()) + + /** @group getParam */ + def getSupplementaryWords: Array[String] = $(supplementaryWords) + /** - * The minimum number of times a token must appear to be included in the word2vec model's - * vocabulary. - * Default: 5 - * @group param - */ + * The minimum number of times a token must appear to be included in the word2vec model's + * vocabulary. + * Default: 5 + * + * @group param + */ final val minCount = new IntParam(this, "minCount", "the minimum number of times a token must " + "appear to be included in the word2vec model's vocabulary (>= 0)", ParamValidators.gtEq(0)) setDefault(minCount -> 5) @@ -92,12 +103,13 @@ private[feature] trait Word2VecBase extends Params def getMinCount: Int = $(minCount) /** - * Sets the maximum length (in words) of each sentence in the input data. - * Any sentence longer than this threshold will be divided into chunks of - * up to `maxSentenceLength` size. - * Default: 1000 - * @group param - */ + * Sets the maximum length (in words) of each sentence in the input data. + * Any sentence longer than this threshold will be divided into chunks of + * up to `maxSentenceLength` size. + * Default: 1000 + * + * @group param + */ final val maxSentenceLength = new IntParam(this, "maxSentenceLength", "Maximum length " + "(in words) of each sentence in the input data. Any sentence longer than this threshold will " + "be divided into chunks up to the size (> 0)", ParamValidators.gt(0)) @@ -110,8 +122,8 @@ private[feature] trait Word2VecBase extends Params setDefault(maxIter -> 1) /** - * Validate and transform the input schema. - */ + * Validate and transform the input schema. + */ protected def validateAndTransformSchema(schema: StructType): StructType = { val typeCandidates = List(new ArrayType(StringType, true), new ArrayType(StringType, false)) SchemaUtils.checkColumnTypes(schema, $(inputCol), typeCandidates) @@ -120,12 +132,12 @@ private[feature] trait Word2VecBase extends Params } /** - * Word2Vec trains a model of `Map(String, Vector)`, i.e. transforms a word into a code for further - * natural language processing or machine learning process. - */ + * Word2Vec trains a model of `Map(String, Vector)`, i.e. transforms a word into a code for further + * natural language processing or machine learning process. + */ @Since("1.4.0") -final class Word2Vec @Since("1.4.0") ( - @Since("1.4.0") override val uid: String) +final class Word2Vec @Since("1.4.0")( + @Since("1.4.0") override val uid: String) extends Estimator[Word2VecModel] with Word2VecBase with DefaultParamsWritable { @Since("1.4.0") @@ -155,6 +167,10 @@ final class Word2Vec @Since("1.4.0") ( @Since("1.4.0") def setNumPartitions(value: Int): this.type = set(numPartitions, value) + /** @group setParam */ + @Since("2.3.0") + def setSupplementaryWords(value: Array[String]): this.type = set(supplementaryWords, value) + /** @group setParam */ @Since("1.4.0") def setMaxIter(value: Int): this.type = set(maxIter, value) @@ -175,6 +191,7 @@ final class Word2Vec @Since("1.4.0") ( override def fit(dataset: Dataset[_]): Word2VecModel = { transformSchema(dataset.schema, logging = true) val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0)) + .zipWithIndex().map(f => if (f._2 == 0) f._1 ++ $(supplementaryWords) else f._1) val wordVectors = new feature.Word2Vec() .setLearningRate($(stepSize)) .setMinCount($(minCount)) @@ -205,20 +222,20 @@ object Word2Vec extends DefaultParamsReadable[Word2Vec] { } /** - * Model fitted by [[Word2Vec]]. - */ + * Model fitted by [[Word2Vec]]. + */ @Since("1.4.0") -class Word2VecModel private[ml] ( - @Since("1.4.0") override val uid: String, - @transient private val wordVectors: feature.Word2VecModel) +class Word2VecModel private[ml]( + @Since("1.4.0") override val uid: String, + @transient private val wordVectors: feature.Word2VecModel) extends Model[Word2VecModel] with Word2VecBase with MLWritable { import Word2VecModel._ /** - * Returns a dataframe with two fields, "word" and "vector", with "word" being a String and - * and the vector the DenseVector that it is mapped to. - */ + * Returns a dataframe with two fields, "word" and "vector", with "word" being a String and + * and the vector the DenseVector that it is mapped to. + */ @Since("1.5.0") @transient lazy val getVectors: DataFrame = { val spark = SparkSession.builder().getOrCreate() @@ -227,11 +244,12 @@ class Word2VecModel private[ml] ( } /** - * Find "num" number of words closest in similarity to the given word, not - * including the word itself. - * @return a dataframe with columns "word" and "similarity" of the word and the cosine - * similarities between the synonyms and the given word vector. - */ + * Find "num" number of words closest in similarity to the given word, not + * including the word itself. + * + * @return a dataframe with columns "word" and "similarity" of the word and the cosine + * similarities between the synonyms and the given word vector. + */ @Since("1.5.0") def findSynonyms(word: String, num: Int): DataFrame = { val spark = SparkSession.builder().getOrCreate() @@ -239,12 +257,13 @@ class Word2VecModel private[ml] ( } /** - * Find "num" number of words whose vector representation is most similar to the supplied vector. - * If the supplied vector is the vector representation of a word in the model's vocabulary, - * that word will be in the results. - * @return a dataframe with columns "word" and "similarity" of the word and the cosine - * similarities between the synonyms and the given word vector. - */ + * Find "num" number of words whose vector representation is most similar to the supplied vector. + * If the supplied vector is the vector representation of a word in the model's vocabulary, + * that word will be in the results. + * + * @return a dataframe with columns "word" and "similarity" of the word and the cosine + * similarities between the synonyms and the given word vector. + */ @Since("2.0.0") def findSynonyms(vec: Vector, num: Int): DataFrame = { val spark = SparkSession.builder().getOrCreate() @@ -252,23 +271,25 @@ class Word2VecModel private[ml] ( } /** - * Find "num" number of words whose vector representation is most similar to the supplied vector. - * If the supplied vector is the vector representation of a word in the model's vocabulary, - * that word will be in the results. - * @return an array of the words and the cosine similarities between the synonyms given - * word vector. - */ + * Find "num" number of words whose vector representation is most similar to the supplied vector. + * If the supplied vector is the vector representation of a word in the model's vocabulary, + * that word will be in the results. + * + * @return an array of the words and the cosine similarities between the synonyms given + * word vector. + */ @Since("2.2.0") def findSynonymsArray(vec: Vector, num: Int): Array[(String, Double)] = { wordVectors.findSynonyms(vec, num) } /** - * Find "num" number of words closest in similarity to the given word, not - * including the word itself. - * @return an array of the words and the cosine similarities between the synonyms given - * word vector. - */ + * Find "num" number of words closest in similarity to the given word, not + * including the word itself. + * + * @return an array of the words and the cosine similarities between the synonyms given + * word vector. + */ @Since("2.2.0") def findSynonymsArray(word: String, num: Int): Array[(String, Double)] = { wordVectors.findSynonyms(word, num) @@ -283,9 +304,9 @@ class Word2VecModel private[ml] ( def setOutputCol(value: String): this.type = set(outputCol, value) /** - * Transform a sentence column to a vector column to represent the whole sentence. The transform - * is performed by averaging all word vectors it contains. - */ + * Transform a sentence column to a vector column to represent the whole sentence. The transform + * is performed by averaging all word vectors it contains. + */ @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) @@ -354,18 +375,19 @@ object Word2VecModel extends MLReadable[Word2VecModel] { private[feature] object Word2VecModelWriter { /** - * Calculate the number of partitions to use in saving the model. - * [SPARK-11994] - We want to partition the model in partitions smaller than - * spark.kryoserializer.buffer.max - * @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max - * @param numWords Vocab size - * @param vectorSize Vector length for each word - */ + * Calculate the number of partitions to use in saving the model. + * [SPARK-11994] - We want to partition the model in partitions smaller than + * spark.kryoserializer.buffer.max + * + * @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max + * @param numWords Vocab size + * @param vectorSize Vector length for each word + */ def calculateNumberOfPartitions( - bufferSizeInBytes: Long, - numWords: Int, - vectorSize: Int): Int = { - val floatSize = 4L // Use Long to help avoid overflow + bufferSizeInBytes: Long, + numWords: Int, + vectorSize: Int): Int = { + val floatSize = 4L // Use Long to help avoid overflow val averageWordSize = 15 // Calculate the approximate size of the model. // Assuming an average word size of 15 bytes, the formula is: @@ -418,4 +440,4 @@ object Word2VecModel extends MLReadable[Word2VecModel] { @Since("1.6.0") override def load(path: String): Word2VecModel = super.load(path) -} +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala index 6183606a7b2a..b590a1e22359 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala @@ -76,6 +76,50 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } } + test("additionalData") { + + val spark = this.spark + import spark.implicits._ + + val sentence = "a b " * 100 + "a c " * 10 + val additionalData = "a b c a c" * 100 + val numOfWords = (sentence + additionalData).split(" ").length + val doc = sc.parallelize(Seq(sentence, sentence)) + .map(line => line.split(" ")) + + val codes = Map( + "a" -> Array(-0.2811822295188904, -0.6356269121170044, -0.3020961284637451), + "b" -> Array(1.0309048891067505, -1.29472815990448, 0.22276712954044342), + "c" -> Array(-0.08456747233867645, 0.5137411952018738, 0.11731560528278351) + ) + + val expected = doc.map { sentence => + Vectors.dense(sentence.map(codes.apply).reduce((word1, word2) => + word1.zip(word2).map { case (v1, v2) => v1 + v2 } + ).map(_ / numOfWords)) + } + + val docDF = doc.zip(expected).toDF("text", "expected") + + val w2v = new Word2Vec() + .setVectorSize(3) + .setInputCol("text") + .setSupplementaryWords(additionalData.split(" ")) + .setOutputCol("result") + .setSeed(42L) + val model = w2v.fit(docDF) + + MLTestingUtils.checkCopyAndUids(w2v, model) + + // These expectations are just magic values, characterizing the current + // behavior. The test needs to be updated to be more general, see SPARK-11502 + val magicExp = Vectors.dense(0.6622298123653639, -0.4716927178881385, 0.1346504972739653) + model.transform(docDF).select("result", "expected").collect().foreach { + case Row(vector1: Vector, vector2: Vector) => + assert(vector1 ~== magicExp absTol 1E-5, "Transformed vector is different with expected.") + } + } + test("getVectors") { val spark = this.spark From 9979214fe0652ea852dec863146ea9b680c0d61c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leonard=20H=C3=B6velmann?= Date: Fri, 14 Jul 2017 14:19:11 +0200 Subject: [PATCH 2/2] fixed intendation --- .../apache/spark/ml/feature/Word2Vec.scala | 144 +++++++++--------- 1 file changed, 72 insertions(+), 72 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 4aa2799bd5bc..bfbede1057a3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -39,11 +39,11 @@ private[feature] trait Word2VecBase extends Params with HasInputCol with HasOutputCol with HasMaxIter with HasStepSize with HasSeed { /** - * The dimension of the code that you want to transform from words. - * Default: 100 - * - * @group param - */ + * The dimension of the code that you want to transform from words. + * Default: 100 + * + * @group param + */ final val vectorSize = new IntParam( this, "vectorSize", "the dimension of codes after transforming from words (> 0)", ParamValidators.gt(0)) @@ -53,11 +53,11 @@ private[feature] trait Word2VecBase extends Params def getVectorSize: Int = $(vectorSize) /** - * The window size (context words from [-window, window]). - * Default: 5 - * - * @group expertParam - */ + * The window size (context words from [-window, window]). + * Default: 5 + * + * @group expertParam + */ final val windowSize = new IntParam( this, "windowSize", "the window size (context words from [-window, window]) (> 0)", ParamValidators.gt(0)) @@ -67,11 +67,11 @@ private[feature] trait Word2VecBase extends Params def getWindowSize: Int = $(windowSize) /** - * Number of partitions for sentences of words. - * Default: 1 - * - * @group param - */ + * Number of partitions for sentences of words. + * Default: 1 + * + * @group param + */ final val numPartitions = new IntParam( this, "numPartitions", "number of partitions for sentences of words (> 0)", ParamValidators.gt(0)) @@ -89,12 +89,12 @@ private[feature] trait Word2VecBase extends Params def getSupplementaryWords: Array[String] = $(supplementaryWords) /** - * The minimum number of times a token must appear to be included in the word2vec model's - * vocabulary. - * Default: 5 - * - * @group param - */ + * The minimum number of times a token must appear to be included in the word2vec model's + * vocabulary. + * Default: 5 + * + * @group param + */ final val minCount = new IntParam(this, "minCount", "the minimum number of times a token must " + "appear to be included in the word2vec model's vocabulary (>= 0)", ParamValidators.gtEq(0)) setDefault(minCount -> 5) @@ -103,13 +103,13 @@ private[feature] trait Word2VecBase extends Params def getMinCount: Int = $(minCount) /** - * Sets the maximum length (in words) of each sentence in the input data. - * Any sentence longer than this threshold will be divided into chunks of - * up to `maxSentenceLength` size. - * Default: 1000 - * - * @group param - */ + * Sets the maximum length (in words) of each sentence in the input data. + * Any sentence longer than this threshold will be divided into chunks of + * up to `maxSentenceLength` size. + * Default: 1000 + * + * @group param + */ final val maxSentenceLength = new IntParam(this, "maxSentenceLength", "Maximum length " + "(in words) of each sentence in the input data. Any sentence longer than this threshold will " + "be divided into chunks up to the size (> 0)", ParamValidators.gt(0)) @@ -122,8 +122,8 @@ private[feature] trait Word2VecBase extends Params setDefault(maxIter -> 1) /** - * Validate and transform the input schema. - */ + * Validate and transform the input schema. + */ protected def validateAndTransformSchema(schema: StructType): StructType = { val typeCandidates = List(new ArrayType(StringType, true), new ArrayType(StringType, false)) SchemaUtils.checkColumnTypes(schema, $(inputCol), typeCandidates) @@ -222,8 +222,8 @@ object Word2Vec extends DefaultParamsReadable[Word2Vec] { } /** - * Model fitted by [[Word2Vec]]. - */ + * Model fitted by [[Word2Vec]]. + */ @Since("1.4.0") class Word2VecModel private[ml]( @Since("1.4.0") override val uid: String, @@ -233,9 +233,9 @@ class Word2VecModel private[ml]( import Word2VecModel._ /** - * Returns a dataframe with two fields, "word" and "vector", with "word" being a String and - * and the vector the DenseVector that it is mapped to. - */ + * Returns a dataframe with two fields, "word" and "vector", with "word" being a String and + * and the vector the DenseVector that it is mapped to. + */ @Since("1.5.0") @transient lazy val getVectors: DataFrame = { val spark = SparkSession.builder().getOrCreate() @@ -244,12 +244,12 @@ class Word2VecModel private[ml]( } /** - * Find "num" number of words closest in similarity to the given word, not - * including the word itself. - * - * @return a dataframe with columns "word" and "similarity" of the word and the cosine - * similarities between the synonyms and the given word vector. - */ + * Find "num" number of words closest in similarity to the given word, not + * including the word itself. + * + * @return a dataframe with columns "word" and "similarity" of the word and the cosine + * similarities between the synonyms and the given word vector. + */ @Since("1.5.0") def findSynonyms(word: String, num: Int): DataFrame = { val spark = SparkSession.builder().getOrCreate() @@ -257,13 +257,13 @@ class Word2VecModel private[ml]( } /** - * Find "num" number of words whose vector representation is most similar to the supplied vector. - * If the supplied vector is the vector representation of a word in the model's vocabulary, - * that word will be in the results. - * - * @return a dataframe with columns "word" and "similarity" of the word and the cosine - * similarities between the synonyms and the given word vector. - */ + * Find "num" number of words whose vector representation is most similar to the supplied vector. + * If the supplied vector is the vector representation of a word in the model's vocabulary, + * that word will be in the results. + * + * @return a dataframe with columns "word" and "similarity" of the word and the cosine + * similarities between the synonyms and the given word vector. + */ @Since("2.0.0") def findSynonyms(vec: Vector, num: Int): DataFrame = { val spark = SparkSession.builder().getOrCreate() @@ -271,25 +271,25 @@ class Word2VecModel private[ml]( } /** - * Find "num" number of words whose vector representation is most similar to the supplied vector. - * If the supplied vector is the vector representation of a word in the model's vocabulary, - * that word will be in the results. - * - * @return an array of the words and the cosine similarities between the synonyms given - * word vector. - */ + * Find "num" number of words whose vector representation is most similar to the supplied vector. + * If the supplied vector is the vector representation of a word in the model's vocabulary, + * that word will be in the results. + * + * @return an array of the words and the cosine similarities between the synonyms given + * word vector. + */ @Since("2.2.0") def findSynonymsArray(vec: Vector, num: Int): Array[(String, Double)] = { wordVectors.findSynonyms(vec, num) } /** - * Find "num" number of words closest in similarity to the given word, not - * including the word itself. - * - * @return an array of the words and the cosine similarities between the synonyms given - * word vector. - */ + * Find "num" number of words closest in similarity to the given word, not + * including the word itself. + * + * @return an array of the words and the cosine similarities between the synonyms given + * word vector. + */ @Since("2.2.0") def findSynonymsArray(word: String, num: Int): Array[(String, Double)] = { wordVectors.findSynonyms(word, num) @@ -304,9 +304,9 @@ class Word2VecModel private[ml]( def setOutputCol(value: String): this.type = set(outputCol, value) /** - * Transform a sentence column to a vector column to represent the whole sentence. The transform - * is performed by averaging all word vectors it contains. - */ + * Transform a sentence column to a vector column to represent the whole sentence. The transform + * is performed by averaging all word vectors it contains. + */ @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) @@ -375,14 +375,14 @@ object Word2VecModel extends MLReadable[Word2VecModel] { private[feature] object Word2VecModelWriter { /** - * Calculate the number of partitions to use in saving the model. - * [SPARK-11994] - We want to partition the model in partitions smaller than - * spark.kryoserializer.buffer.max - * - * @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max - * @param numWords Vocab size - * @param vectorSize Vector length for each word - */ + * Calculate the number of partitions to use in saving the model. + * [SPARK-11994] - We want to partition the model in partitions smaller than + * spark.kryoserializer.buffer.max + * + * @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max + * @param numWords Vocab size + * @param vectorSize Vector length for each word + */ def calculateNumberOfPartitions( bufferSizeInBytes: Long, numWords: Int,