Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.ml.clustering

import java.util.Locale

import breeze.linalg.normalize
import breeze.numerics.exp
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JObject
Expand All @@ -27,23 +29,23 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel,
EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel,
LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
LDAOptimizer => OldLDAOptimizer, LDAUtils => OldLDAUtils, LocalLDAModel => OldLocalLDAModel,
OnlineLDAOptimizer => OldOnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.MatrixImplicits._
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf}
import org.apache.spark.sql.functions.{monotonically_increasing_id, udf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.PeriodicCheckpointer
Expand Down Expand Up @@ -457,21 +459,56 @@ abstract class LDAModel private[ml] (
*/
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
if ($(topicDistributionCol).nonEmpty) {
transformSchema(dataset.schema, logging = true)

// TODO: Make the transformer natively in ml framework to avoid extra conversion.
val transformer = oldLocalModel.getTopicDistributionMethod
if ($(topicDistributionCol).nonEmpty) {
val func = getTopicDistributionMethod
val transformer = udf(func)

val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML }
dataset.withColumn($(topicDistributionCol),
t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF()
transformer(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
} else {
logWarning("LDAModel.transform was called without any output columns. Set an output column" +
" such as topicDistributionCol to produce results.")
dataset.toDF()
}
}

/**
* Get a method usable as a UDF for `topicDistributions()`
*/
private def getTopicDistributionMethod: Vector => Vector = {
val expElogbeta = exp(OldLDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
val oldModel = oldLocalModel
val docConcentrationBrz = oldModel.docConcentration.asBreeze
val gammaShape = oldModel.gammaShape
val k = oldModel.k
val gammaSeed = oldModel.seed

vector: Vector =>
if (vector.numNonzeros == 0) {
Vectors.zeros(k)
} else {
val (ids: List[Int], cts: Array[Double]) = vector match {
case v: DenseVector => ((0 until v.size).toList, v.values)
case v: SparseVector => (v.indices.toList, v.values)
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}

val (gamma, _, _) = OldOnlineLDAOptimizer.variationalTopicInference(
ids,
cts,
expElogbeta,
docConcentrationBrz,
gammaShape,
k,
gammaSeed)
Vectors.dense(normalize(gamma, 1.0).toArray)
}
}

@Since("1.6.0")
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ package org.apache.spark.ml.feature

import java.{util => ju}

import org.json4s.JsonDSL._
import org.json4s.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.Model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.Since
import org.apache.spark.ml._
import org.apache.spark.ml.attribute.{AttributeGroup, _}
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -264,14 +264,25 @@ final class ChiSqSelectorModel private[ml] (

@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val transformedSchema = transformSchema(dataset.schema, logging = true)
val newField = transformedSchema.last

// TODO: Make the transformer natively in ml framework to avoid extra conversion.
val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML
val outputSchema = transformSchema(dataset.schema, logging = true)

val newSize = selectedFeatures.length
val func = { vector: Vector =>
vector match {
case SparseVector(_, indices, values) =>
val (newIndices, newValues) = chiSqSelector.compressSparse(indices, values)
Vectors.sparse(newSize, newIndices, newValues)
case DenseVector(values) =>
Vectors.dense(chiSqSelector.compressDense(values))
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}

val selector = udf(transformer)
dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata)
val transformer = udf(func)
dataset.withColumn($(outputCol), transformer(col($(featuresCol))),
outputSchema($(outputCol)).metadata)
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class LocalLDAModel private[spark] (
override protected[spark] val gammaShape: Double = 100)
extends LDAModel with Serializable {

private var seed: Long = Utils.random.nextLong()
private[spark] var seed: Long = Utils.random.nextLong()

@Since("1.3.0")
override def k: Int = topics.numCols
Expand Down Expand Up @@ -386,31 +386,6 @@ class LocalLDAModel private[spark] (
}
}

/**
* Get a method usable as a UDF for `topicDistributions()`
*/
private[spark] def getTopicDistributionMethod: Vector => Vector = {
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
val docConcentrationBrz = this.docConcentration.asBreeze
val gammaShape = this.gammaShape
val k = this.k
val gammaSeed = this.seed

(termCounts: Vector) =>
if (termCounts.numNonzeros == 0) {
Vectors.zeros(k)
} else {
val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts,
expElogbeta,
docConcentrationBrz,
gammaShape,
k,
gammaSeed)
Vectors.dense(normalize(gamma, 1.0).toArray)
}
}

/**
* Predicts the topic mixture distribution for a document (often called "theta" in the
* literature). Returns a vector of zeros for an empty document.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging {
* Serializable companion object containing helper methods and shared code for
* [[OnlineLDAOptimizer]] and [[LocalLDAModel]].
*/
private[clustering] object OnlineLDAOptimizer {
private[spark] object OnlineLDAOptimizer {
/**
* Uses variational inference to infer the topic distribution `gammad` given the term counts
* for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will
Expand All @@ -608,27 +608,24 @@ private[clustering] object OnlineLDAOptimizer {
* @return Returns a tuple of `gammad` - estimate of gamma, the topic distribution, `sstatsd` -
* statistics for updating lambda and `ids` - list of termCounts vector indices.
*/
private[clustering] def variationalTopicInference(
termCounts: Vector,
private[spark] def variationalTopicInference(
indices: List[Int],
Comment thread
srowen marked this conversation as resolved.
values: Array[Double],
expElogbeta: BDM[Double],
alpha: breeze.linalg.Vector[Double],
gammaShape: Double,
k: Int,
seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: DenseVector => ((0 until v.size).toList, v.values)
case v: SparseVector => (v.indices.toList, v.values)
}
// Initialize the variational distribution q(theta|gamma) for the mini-batch
val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(seed))
val gammad: BDV[Double] =
new Gamma(gammaShape, 1.0 / gammaShape)(randBasis).samplesVector(k) // K
val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K
val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K
val expElogbetad = expElogbeta(indices, ::).toDenseMatrix // ids * K

val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids
var meanGammaChange = 1D
val ctsVector = new BDV[Double](cts) // ids
val ctsVector = new BDV[Double](values) // ids

// Iterate between gamma and phi until convergence
while (meanGammaChange > 1e-3) {
Expand All @@ -642,6 +639,20 @@ private[clustering] object OnlineLDAOptimizer {
}

val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector /:/ phiNorm).asDenseMatrix
(gammad, sstatsd, ids)
(gammad, sstatsd, indices)
}

private[clustering] def variationalTopicInference(
termCounts: Vector,
expElogbeta: BDM[Double],
alpha: breeze.linalg.Vector[Double],
gammaShape: Double,
k: Int,
seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: DenseVector => ((0 until v.size).toList, v.values)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere, as an optimization, can we avoid (0 until v.size).toList)? pass an empty list in this case or something, and then deduce that the indices are just the same length as the values?

You're generally solving this with separate sparse/dense methods which could be fine too if it doesn't result in too much code duplication and improves performance in the dense case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good then except we might be able to make one more optimization here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just look into the usage of indices ids, and find that it is used as slicing indices like val expElogbetad = expElogbeta(indices, ::).toDenseMatrix.
I will have a try.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that an empty list may not help to simplify the impl.
since in place like private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer, we still have to create a List for slicing.

case v: SparseVector => (v.indices.toList, v.values)
}
variationalTopicInference(ids, cts, expElogbeta, alpha, gammaShape, k, seed)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import breeze.numerics._
/**
* Utility methods for LDA.
*/
private[clustering] object LDAUtils {
private[spark] object LDAUtils {
/**
* Log Sum Exp with overflow protection using the identity:
* For any a: $\log \sum_{n=1}^N \exp\{x_n\} = a + \log \sum_{n=1}^N \exp\{x_n - a\}$
Expand All @@ -44,7 +44,7 @@ private[clustering] object LDAUtils {
* Computes [[dirichletExpectation()]] row-wise, assuming each row of alpha are
* Dirichlet parameters.
*/
private[clustering] def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = {
private[spark] def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = {
val rowSum = sum(alpha(breeze.linalg.*, ::))
val digAlpha = digamma(alpha)
val digRowSum = digamma(rowSum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,40 +75,48 @@ class ChiSqSelectorModel @Since("1.3.0") (
private def compress(features: Vector): Vector = {
features match {
case SparseVector(size, indices, values) =>
val newSize = filterIndices.length
val newValues = new ArrayBuilder.ofDouble
val newIndices = new ArrayBuilder.ofInt
var i = 0
var j = 0
var indicesIdx = 0
var filterIndicesIdx = 0
while (i < indices.length && j < filterIndices.length) {
indicesIdx = indices(i)
filterIndicesIdx = filterIndices(j)
if (indicesIdx == filterIndicesIdx) {
newIndices += j
newValues += values(i)
j += 1
i += 1
} else {
if (indicesIdx > filterIndicesIdx) {
j += 1
} else {
i += 1
}
}
}
// TODO: Sparse representation might be ineffective if (newSize ~= newValues.size)
Vectors.sparse(newSize, newIndices.result(), newValues.result())
val (newIndices, newValues) = compressSparse(indices, values)
Vectors.sparse(filterIndices.length, newIndices, newValues)
case DenseVector(values) =>
val values = features.toArray
Vectors.dense(filterIndices.map(i => values(i)))
Vectors.dense(compressDense(values))
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}

private[spark] def compressSparse(indices: Array[Int],
values: Array[Double]): (Array[Int], Array[Double]) = {
val newValues = new ArrayBuilder.ofDouble
val newIndices = new ArrayBuilder.ofInt
var i = 0
var j = 0
var indicesIdx = 0
var filterIndicesIdx = 0
while (i < indices.length && j < filterIndices.length) {
indicesIdx = indices(i)
filterIndicesIdx = filterIndices(j)
if (indicesIdx == filterIndicesIdx) {
newIndices += j
newValues += values(i)
j += 1
i += 1
} else {
if (indicesIdx > filterIndicesIdx) {
j += 1
} else {
i += 1
}
}
}
// TODO: Sparse representation might be ineffective if (newSize ~= newValues.size)
(newIndices.result(), newValues.result())
}

private[spark] def compressDense(values: Array[Double]): Array[Double] = {
filterIndices.map(i => values(i))
}

@Since("1.6.0")
override def save(sc: SparkContext, path: String): Unit = {
ChiSqSelectorModel.SaveLoadV1_0.save(sc, this, path)
Expand Down