Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 45 additions & 23 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.ml.feature

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
Expand All @@ -34,14 +33,15 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.{Utils, VersionUtils}

/**
* Params for [[Word2Vec]] and [[Word2VecModel]].
*/
* Params for [[Word2Vec]] and [[Word2VecModel]].
*/
private[feature] trait Word2VecBase extends Params
with HasInputCol with HasOutputCol with HasMaxIter with HasStepSize with HasSeed {

/**
* The dimension of the code that you want to transform from words.
* Default: 100
*
* @group param
*/
final val vectorSize = new IntParam(
Expand All @@ -55,6 +55,7 @@ private[feature] trait Word2VecBase extends Params
/**
* The window size (context words from [-window, window]).
* Default: 5
*
* @group expertParam
*/
final val windowSize = new IntParam(
Expand All @@ -68,6 +69,7 @@ private[feature] trait Word2VecBase extends Params
/**
* Number of partitions for sentences of words.
* Default: 1
*
* @group param
*/
final val numPartitions = new IntParam(
Expand All @@ -78,10 +80,19 @@ private[feature] trait Word2VecBase extends Params
/** @group getParam */
def getNumPartitions: Int = $(numPartitions)

final val supplementaryWords = new StringArrayParam(this,
"supplementaryWordsCol", "additional training data for unsupervised vector learning")

setDefault(supplementaryWords -> Array[String]())

/** @group getParam */
def getSupplementaryWords: Array[String] = $(supplementaryWords)

/**
* The minimum number of times a token must appear to be included in the word2vec model's
* vocabulary.
* Default: 5
*
* @group param
*/
final val minCount = new IntParam(this, "minCount", "the minimum number of times a token must " +
Expand All @@ -96,6 +107,7 @@ private[feature] trait Word2VecBase extends Params
* Any sentence longer than this threshold will be divided into chunks of
* up to `maxSentenceLength` size.
* Default: 1000
*
* @group param
*/
final val maxSentenceLength = new IntParam(this, "maxSentenceLength", "Maximum length " +
Expand All @@ -120,12 +132,12 @@ private[feature] trait Word2VecBase extends Params
}

/**
* Word2Vec trains a model of `Map(String, Vector)`, i.e. transforms a word into a code for further
* natural language processing or machine learning process.
*/
* Word2Vec trains a model of `Map(String, Vector)`, i.e. transforms a word into a code for further
* natural language processing or machine learning process.
*/
@Since("1.4.0")
final class Word2Vec @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
final class Word2Vec @Since("1.4.0")(
@Since("1.4.0") override val uid: String)
extends Estimator[Word2VecModel] with Word2VecBase with DefaultParamsWritable {

@Since("1.4.0")
Expand Down Expand Up @@ -155,6 +167,10 @@ final class Word2Vec @Since("1.4.0") (
@Since("1.4.0")
def setNumPartitions(value: Int): this.type = set(numPartitions, value)

/** @group setParam */
@Since("2.3.0")
def setSupplementaryWords(value: Array[String]): this.type = set(supplementaryWords, value)

/** @group setParam */
@Since("1.4.0")
def setMaxIter(value: Int): this.type = set(maxIter, value)
Expand All @@ -175,6 +191,7 @@ final class Word2Vec @Since("1.4.0") (
override def fit(dataset: Dataset[_]): Word2VecModel = {
transformSchema(dataset.schema, logging = true)
val input = dataset.select($(inputCol)).rdd.map(_.getAs[Seq[String]](0))
.zipWithIndex().map(f => if (f._2 == 0) f._1 ++ $(supplementaryWords) else f._1)
val wordVectors = new feature.Word2Vec()
.setLearningRate($(stepSize))
.setMinCount($(minCount))
Expand Down Expand Up @@ -208,9 +225,9 @@ object Word2Vec extends DefaultParamsReadable[Word2Vec] {
* Model fitted by [[Word2Vec]].
*/
@Since("1.4.0")
class Word2VecModel private[ml] (
@Since("1.4.0") override val uid: String,
@transient private val wordVectors: feature.Word2VecModel)
class Word2VecModel private[ml](
@Since("1.4.0") override val uid: String,
@transient private val wordVectors: feature.Word2VecModel)
extends Model[Word2VecModel] with Word2VecBase with MLWritable {

import Word2VecModel._
Expand All @@ -229,8 +246,9 @@ class Word2VecModel private[ml] (
/**
* Find "num" number of words closest in similarity to the given word, not
* including the word itself.
*
* @return a dataframe with columns "word" and "similarity" of the word and the cosine
* similarities between the synonyms and the given word vector.
* similarities between the synonyms and the given word vector.
*/
@Since("1.5.0")
def findSynonyms(word: String, num: Int): DataFrame = {
Expand All @@ -242,8 +260,9 @@ class Word2VecModel private[ml] (
* Find "num" number of words whose vector representation is most similar to the supplied vector.
* If the supplied vector is the vector representation of a word in the model's vocabulary,
* that word will be in the results.
*
* @return a dataframe with columns "word" and "similarity" of the word and the cosine
* similarities between the synonyms and the given word vector.
* similarities between the synonyms and the given word vector.
*/
@Since("2.0.0")
def findSynonyms(vec: Vector, num: Int): DataFrame = {
Expand All @@ -255,8 +274,9 @@ class Word2VecModel private[ml] (
* Find "num" number of words whose vector representation is most similar to the supplied vector.
* If the supplied vector is the vector representation of a word in the model's vocabulary,
* that word will be in the results.
*
* @return an array of the words and the cosine similarities between the synonyms given
* word vector.
* word vector.
*/
@Since("2.2.0")
def findSynonymsArray(vec: Vector, num: Int): Array[(String, Double)] = {
Expand All @@ -266,8 +286,9 @@ class Word2VecModel private[ml] (
/**
* Find "num" number of words closest in similarity to the given word, not
* including the word itself.
*
* @return an array of the words and the cosine similarities between the synonyms given
* word vector.
* word vector.
*/
@Since("2.2.0")
def findSynonymsArray(word: String, num: Int): Array[(String, Double)] = {
Expand Down Expand Up @@ -357,15 +378,16 @@ object Word2VecModel extends MLReadable[Word2VecModel] {
* Calculate the number of partitions to use in saving the model.
* [SPARK-11994] - We want to partition the model in partitions smaller than
* spark.kryoserializer.buffer.max
* @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max
* @param numWords Vocab size
* @param vectorSize Vector length for each word
*
* @param bufferSizeInBytes Set to spark.kryoserializer.buffer.max
* @param numWords Vocab size
* @param vectorSize Vector length for each word
*/
def calculateNumberOfPartitions(
bufferSizeInBytes: Long,
numWords: Int,
vectorSize: Int): Int = {
val floatSize = 4L // Use Long to help avoid overflow
bufferSizeInBytes: Long,
numWords: Int,
vectorSize: Int): Int = {
val floatSize = 4L // Use Long to help avoid overflow
val averageWordSize = 15
// Calculate the approximate size of the model.
// Assuming an average word size of 15 bytes, the formula is:
Expand Down Expand Up @@ -418,4 +440,4 @@ object Word2VecModel extends MLReadable[Word2VecModel] {

@Since("1.6.0")
override def load(path: String): Word2VecModel = super.load(path)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,50 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
}
}

test("additionalData") {

val spark = this.spark
import spark.implicits._

val sentence = "a b " * 100 + "a c " * 10
val additionalData = "a b c a c" * 100
val numOfWords = (sentence + additionalData).split(" ").length
val doc = sc.parallelize(Seq(sentence, sentence))
.map(line => line.split(" "))

val codes = Map(
"a" -> Array(-0.2811822295188904, -0.6356269121170044, -0.3020961284637451),
"b" -> Array(1.0309048891067505, -1.29472815990448, 0.22276712954044342),
"c" -> Array(-0.08456747233867645, 0.5137411952018738, 0.11731560528278351)
)

val expected = doc.map { sentence =>
Vectors.dense(sentence.map(codes.apply).reduce((word1, word2) =>
word1.zip(word2).map { case (v1, v2) => v1 + v2 }
).map(_ / numOfWords))
}

val docDF = doc.zip(expected).toDF("text", "expected")

val w2v = new Word2Vec()
.setVectorSize(3)
.setInputCol("text")
.setSupplementaryWords(additionalData.split(" "))
.setOutputCol("result")
.setSeed(42L)
val model = w2v.fit(docDF)

MLTestingUtils.checkCopyAndUids(w2v, model)

// These expectations are just magic values, characterizing the current
// behavior. The test needs to be updated to be more general, see SPARK-11502
val magicExp = Vectors.dense(0.6622298123653639, -0.4716927178881385, 0.1346504972739653)
model.transform(docDF).select("result", "expected").collect().foreach {
case Row(vector1: Vector, vector2: Vector) =>
assert(vector1 ~== magicExp absTol 1E-5, "Transformed vector is different with expected.")
}
}

test("getVectors") {

val spark = this.spark
Expand Down