diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 46a0730f5ddb8..add9b0f1c15f7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -17,16 +17,15 @@ package org.apache.spark.ml.feature +import breeze.linalg.{DenseVector => BDV} import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -86,13 +85,21 @@ final class IDF @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") override def fit(dataset: Dataset[_]): IDFModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { - case Row(v: Vector) => OldVectors.fromML(v) + val input: RDD[Vector] = dataset.select($(inputCol)).rdd.map { + case Row(v: Vector) => v } - val idf = new feature.IDF($(minDocFreq)).fit(input) + val idf = internalFit(input) copyValues(new IDFModel(uid, idf).setParent(this)) } + private def internalFit(dataset: RDD[Vector]): Vector = { + dataset. + treeAggregate(new DocumentFrequencyAggregator($(minDocFreq)))( + seqOp = (df, v) => df.add(v), + combOp = (df1, df2) => df1.merge(df2) + ).idf() + } + @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) @@ -107,15 +114,103 @@ object IDF extends DefaultParamsReadable[IDF] { @Since("1.6.0") override def load(path: String): IDF = super.load(path) + + +} + +/** Document frequency aggregator. */ +private class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable { + + /** number of documents */ + private var m = 0L + /** document frequency vector */ + private var df: BDV[Long] = _ + + + def this() = this(0) + + /** Adds a new document. */ + def add(doc: Vector): this.type = { + if (isEmpty) { + df = BDV.zeros(doc.size) + } + doc match { + case SparseVector(_, indices, values) => + val nnz = indices.length + var k = 0 + while (k < nnz) { + if (values(k) > 0) { + df(indices(k)) += 1L + } + k += 1 + } + case DenseVector(values) => + val n = values.length + var j = 0 + while (j < n) { + if (values(j) > 0.0) { + df(j) += 1L + } + j += 1 + } + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + m += 1L + this + } + + /** Merges another. */ + def merge(other: DocumentFrequencyAggregator): this.type = { + if (!other.isEmpty) { + m += other.m + if (df == null) { + df = other.df.copy + } else { + df += other.df + } + } + this + } + + private def isEmpty: Boolean = m == 0L + + /** Returns the current IDF vector. */ + def idf(): Vector = { + if (isEmpty) { + throw new IllegalStateException("Haven't seen any document yet.") + } + val n = df.length + val inv = new Array[Double](n) + var j = 0 + while (j < n) { + /* + * If the term is not present in the minimum + * number of documents, set IDF to 0. This + * will cause multiplication in IDFModel to + * set TF-IDF to 0. + * + * Since arrays are initialized to 0 by default, + * we just omit changing those entries. + */ + if (df(j) >= minDocFreq) { + inv(j) = math.log((m + 1.0) / (df(j) + 1.0)) + } + j += 1 + } + Vectors.dense(inv) + } } + /** * Model fitted by [[IDF]]. */ @Since("1.4.0") class IDFModel private[ml] ( @Since("1.4.0") override val uid: String, - idfModel: feature.IDFModel) + vector: Vector) extends Model[IDFModel] with IDFBase with MLWritable { import IDFModel._ @@ -132,10 +227,38 @@ class IDFModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion. - val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML } + val idf = udf { vec: Vector => transformVectors(vec) } dataset.withColumn($(outputCol), idf(col($(inputCol)))) } + private def transformVectors(vec: Vector): Vector = { + + val n = vec.size + vec match { + case SparseVector(_, indices, values) => + val nnz = indices.length + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = values(k) * vector(indices(k)) + k += 1 + } + Vectors.sparse(n, indices, newValues) + case DenseVector(values) => + val newValues = new Array[Double](n) + var j = 0 + while (j < n) { + newValues(j) = values(j) * idf(j) + j += 1 + } + Vectors.dense(newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + + } + @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) @@ -143,13 +266,13 @@ class IDFModel private[ml] ( @Since("1.4.1") override def copy(extra: ParamMap): IDFModel = { - val copied = new IDFModel(uid, idfModel) + val copied = new IDFModel(uid, vector) copyValues(copied, extra).setParent(parent) } /** Returns the IDF vector. */ @Since("2.0.0") - def idf: Vector = idfModel.idf.asML + def idf: Vector = vector @Since("1.6.0") override def write: MLWriter = new IDFModelWriter(this) @@ -181,7 +304,7 @@ object IDFModel extends MLReadable[IDFModel] { val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf") .select("idf") .head() - val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf))) + val model = new IDFModel(metadata.uid, idf) DefaultParamsReader.getAndSetParams(model, metadata) model } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 005edf73d29be..083cd6fde033e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -22,8 +22,6 @@ import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel} -import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Row @@ -46,7 +44,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead test("params") { ParamsSuite.checkParams(new IDF) - val model = new IDFModel("idf", new OldIDFModel(Vectors.dense(1.0))) + val model = new IDFModel("idf", Vectors.dense(1.0)) ParamsSuite.checkParams(model) } @@ -114,7 +112,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead } test("IDFModel read/write") { - val instance = new IDFModel("myIDFModel", new OldIDFModel(Vectors.dense(1.0, 2.0))) + val instance = new IDFModel("myIDFModel", Vectors.dense(1.0, 2.0)) .setInputCol("myInputCol") .setOutputCol("myOutputCol") val newInstance = testDefaultReadWrite(instance)