From 559c09904538012b70bcb3493b8bc287dd855b2d Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Mon, 7 Nov 2016 13:30:32 -0800 Subject: [PATCH 01/20] [SPARK-18334] MinHash should use binary hash distance --- .../main/scala/org/apache/spark/ml/feature/MinHash.scala | 6 +++++- .../scala/org/apache/spark/ml/feature/MinHashSuite.scala | 6 ++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala index d9d0f32254e24..e7ba6375168e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala @@ -76,7 +76,11 @@ class MinHashModel private[ml] ( @Since("2.1.0") override protected[ml] def hashDistance(x: Vector, y: Vector): Double = { // Since it's generated by hashing, it will be a pair of dense vectors. - x.toDense.values.zip(y.toDense.values).map(pair => math.abs(pair._1 - pair._2)).min + if (x.toDense.values.zip(y.toDense.values).exists(pair => pair._1 == pair._2)) { + 0 + } else { + 1 + } } @Since("2.1.0") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala index c32ca7d69cf84..cf6290b315abc 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala @@ -69,9 +69,11 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default val v1 = Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0))) val v2 = Vectors.sparse(10, Seq((1, 1.0), (3, 1.0), (5, 1.0), (7, 1.0), (9, 1.0))) val keyDist = model.keyDistance(v1, v2) - val hashDist = model.hashDistance(Vectors.dense(-5, 5), Vectors.dense(1, 2)) + val hashDist1 = model.hashDistance(Vectors.dense(1, 2), Vectors.dense(3, 4)) + val hashDist2 = model.hashDistance(Vectors.dense(1, 2), Vectors.dense(3, 2)) assert(keyDist === 0.5) - assert(hashDist === 3) + assert(hashDist1 === 1.0) + assert(hashDist2 === 0.0) } test("MinHash: test of LSH property") { From 517a97bd16f3771d9abbcdf54957a011f5f87adc Mon Sep 17 00:00:00 2001 From: Yunni Date: Tue, 8 Nov 2016 01:15:24 -0500 Subject: [PATCH 02/20] Remove misleading documentation as requested --- .../main/scala/org/apache/spark/ml/feature/MinHash.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala index e7ba6375168e1..488c4ede5f457 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala @@ -32,13 +32,7 @@ import org.apache.spark.sql.types.StructType * :: Experimental :: * * Model produced by [[MinHash]], where multiple hash functions are stored. Each hash function is - * a perfect hash function: - * `h_i(x) = (x * k_i mod prime) mod numEntries` - * where `k_i` is the i-th coefficient, and both `x` and `k_i` are from `Z_prime^*` - * - * Reference: - * [[https://en.wikipedia.org/wiki/Perfect_hash_function Wikipedia on Perfect Hash Function]] - * + * a perfect hash. * @param numEntries The number of entries of the hash functions. * @param randCoefficients An array of random coefficients, each used by one hash function. */ From b546dbd207a04e73bde097f25cae8c927322c2ae Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 8 Nov 2016 10:54:09 -0800 Subject: [PATCH 03/20] Add warning for multi-probe in MinHash --- .../org/apache/spark/ml/feature/LSH.scala | 18 +++++++++++++----- .../org/apache/spark/ml/feature/MinHash.scala | 8 ++++++++ .../org/apache/spark/ml/feature/LSHTest.scala | 7 ++++--- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 333a8c364a884..a0b2e75bdd02c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -99,6 +99,13 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] validateAndTransformSchema(schema) } + /** + * Check prerequisite for nearest neighbor. This method will be overridden in subclasses. + * + * @param singleProbe True for using single-probe; false for multi-probe + */ + protected[this] def checkNearestNeighbor(singleProbe: Boolean) = {} + /** * Given a large dataset and an item, approximately find at most k items which have the closest * distance to the item. If the [[outputCol]] is missing, the method will transform the data; if @@ -106,13 +113,13 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * transformed data when necessary. * * This method implements two ways of fetching k nearest neighbors: - * - Single Probing: Fast, return at most k elements (Probing only one buckets) - * - Multiple Probing: Slow, return exact k elements (Probing multiple buckets close to the key) + * - Single-probe: Fast, return at most k elements (Probing only one buckets) + * - Multi-probe: Slow, return exact k elements (Probing multiple buckets close to the key) * * @param dataset the dataset to search for nearest neighbors of the key * @param key Feature vector representing the item to search for * @param numNearestNeighbors The maximum number of nearest neighbors - * @param singleProbing True for using Single Probing; false for multiple probing + * @param singleProbe True for using single-probe; false for multi-probe * @param distCol Output column for storing the distance between each result row and the key * @return A dataset containing at most k items closest to the key. A distCol is added to show * the distance between each row and the key. @@ -121,9 +128,10 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] dataset: Dataset[_], key: Vector, numNearestNeighbors: Int, - singleProbing: Boolean, + singleProbe: Boolean, distCol: String): Dataset[_] = { require(numNearestNeighbors > 0, "The number of nearest neighbors cannot be less than 1") + checkNearestNeighbor(singleProbe) // Get Hash Value of the key val keyHash = hashFunction(key) val modelDataset: DataFrame = if (!dataset.columns.contains($(outputCol))) { @@ -136,7 +144,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] val hashDistUDF = udf((x: Vector) => hashDistance(x, keyHash), DataTypes.DoubleType) val hashDistCol = hashDistUDF(col($(outputCol))) - val modelSubset = if (singleProbing) { + val modelSubset = if (singleProbe) { modelDataset.filter(hashDistCol === 0.0) } else { // Compute threshold to get exact k elements. diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala index e7ba6375168e1..6b99fbdb217a9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala @@ -83,6 +83,14 @@ class MinHashModel private[ml] ( } } + @Since("2.1.0") + override protected[this] def checkNearestNeighbor(singleProbe: Boolean) = { + if (!singleProbe) { + log.warn("Multi-probe for MinHash will run brute force nearest neighbor when there " + + "aren't enough candidates.") + } + } + @Since("2.1.0") override def copy(extra: ParamMap): this.type = defaultCopy(extra) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala index 5c025546f332b..f7ded60fb6542 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala @@ -83,6 +83,7 @@ private[ml] object LSHTest { * @param dataset the dataset to look for the key * @param key The key to hash for the item * @param k The maximum number of items closest to the key + * @param singleProbe True for using single-probe; false for multi-probe * @tparam T The class type of lsh * @return A tuple of two doubles, representing precision and recall rate */ @@ -91,7 +92,7 @@ private[ml] object LSHTest { dataset: Dataset[_], key: Vector, k: Int, - singleProbing: Boolean): (Double, Double) = { + singleProbe: Boolean): (Double, Double) = { val model = lsh.fit(dataset) // Compute expected @@ -99,14 +100,14 @@ private[ml] object LSHTest { val expected = dataset.sort(distUDF(col(model.getInputCol))).limit(k) // Compute actual - val actual = model.approxNearestNeighbors(dataset, key, k, singleProbing, "distCol") + val actual = model.approxNearestNeighbors(dataset, key, k, singleProbe, "distCol") assert(actual.schema.sameType(model .transformSchema(dataset.schema) .add("distCol", DataTypes.DoubleType)) ) - if (!singleProbing) { + if (!singleProbe) { assert(actual.count() == k) } From c8243c7def8c270072edd5889cea7fd02677b44f Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Wed, 9 Nov 2016 15:11:20 -0800 Subject: [PATCH 04/20] (1) Fix documentation as CR suggested (2) Fix typo in unit test --- .../src/main/scala/org/apache/spark/ml/feature/MinHash.scala | 4 +++- .../test/scala/org/apache/spark/ml/feature/MinHashSuite.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala index 82ac9b0888b23..8b320c5bbb77f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala @@ -32,7 +32,9 @@ import org.apache.spark.sql.types.StructType * :: Experimental :: * * Model produced by [[MinHash]], where multiple hash functions are stored. Each hash function is - * a perfect hash. + * a perfect hash function for a specific set `S` with cardinality equal to a half of `numEntries`: + * `h_i(x) = ((x \cdot k_i) \mod prime) \mod numEntries` + * * @param numEntries The number of entries of the hash functions. * @param randCoefficients An array of random coefficients, each used by one hash function. */ diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala index cf6290b315abc..d05f693cc961f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala @@ -99,7 +99,7 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default (0 until 100).filter(_.toString.contains("1")).map((_, 1.0))) val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(mh, dataset, key, 20, - singleProbing = true) + singleProbe = true) assert(precision >= 0.7) assert(recall >= 0.7) } From 6aac8b343c5ea3a91b8517a2d3f47ed055ece9ad Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Wed, 9 Nov 2016 15:22:27 -0800 Subject: [PATCH 05/20] Fix typo in unit test --- .../org/apache/spark/ml/feature/RandomProjectionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala index cd82ee2117a07..07f95527fcfde 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala @@ -138,7 +138,7 @@ class RandomProjectionSuite .setSeed(12345) val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(rp, dataset, key, 100, - singleProbing = true) + singleProbe = true) assert(precision >= 0.6) assert(recall >= 0.6) } @@ -154,7 +154,7 @@ class RandomProjectionSuite .setSeed(12345) val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(rp, dataset, key, 100, - singleProbing = false) + singleProbe = false) assert(precision >= 0.7) assert(recall >= 0.7) } From 98707436ea8a90599fd8615a47afff3bf29a3ae6 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Sun, 13 Nov 2016 20:25:17 -0800 Subject: [PATCH 06/20] [SPARK-18408] API Improvements for LSH --- ...cala => BucketedRandomProjectionLSH.scala} | 75 +++++++------ .../org/apache/spark/ml/feature/LSH.scala | 103 ++++++++++++------ .../{MinHash.scala => MinHashLSH.scala} | 54 ++++----- ...=> BucketedRandomProjectionLSHSuite.scala} | 90 ++++++++------- .../org/apache/spark/ml/feature/LSHTest.scala | 5 +- ...nHashSuite.scala => MinHashLSHSuite.scala} | 42 +++---- 6 files changed, 216 insertions(+), 153 deletions(-) rename mllib/src/main/scala/org/apache/spark/ml/feature/{RandomProjection.scala => BucketedRandomProjectionLSH.scala} (68%) rename mllib/src/main/scala/org/apache/spark/ml/feature/{MinHash.scala => MinHashLSH.scala} (79%) rename mllib/src/test/scala/org/apache/spark/ml/feature/{RandomProjectionSuite.scala => BucketedRandomProjectionLSHSuite.scala} (68%) rename mllib/src/test/scala/org/apache/spark/ml/feature/{MinHashSuite.scala => MinHashLSHSuite.scala} (81%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala similarity index 68% rename from mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala rename to mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index 1b524c6710b42..7835ddfdf8763 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RandomProjection.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -34,9 +34,9 @@ import org.apache.spark.sql.types.StructType /** * :: Experimental :: * - * Params for [[RandomProjection]]. + * Params for [[BucketedRandomProjectionLSH]]. */ -private[ml] trait RandomProjectionParams extends Params { +private[ml] trait BucketedRandomProjectionParams extends Params { /** * The length of each hash bucket, a larger bucket lowers the false negative rate. The number of @@ -58,8 +58,8 @@ private[ml] trait RandomProjectionParams extends Params { /** * :: Experimental :: * - * Model produced by [[RandomProjection]], where multiple random vectors are stored. The vectors - * are normalized to be unit vectors and each vector is used in a hash function: + * Model produced by [[BucketedRandomProjectionLSH]], where multiple random vectors are stored. The + * vectors are normalized to be unit vectors and each vector is used in a hash function: * `h_i(x) = floor(r_i.dot(x) / bucketLength)` * where `r_i` is the i-th random unit vector. The number of buckets will be `(max L2 norm of input * vectors) / bucketLength`. @@ -68,18 +68,18 @@ private[ml] trait RandomProjectionParams extends Params { */ @Experimental @Since("2.1.0") -class RandomProjectionModel private[ml] ( +class BucketedRandomProjectionModel private[ml]( override val uid: String, - @Since("2.1.0") val randUnitVectors: Array[Vector]) - extends LSHModel[RandomProjectionModel] with RandomProjectionParams { + @Since("2.1.0") private[ml] val randUnitVectors: Array[Vector]) + extends LSHModel[BucketedRandomProjectionModel] with BucketedRandomProjectionParams { @Since("2.1.0") - override protected[ml] val hashFunction: (Vector) => Vector = { + override protected[ml] val hashFunction: Vector => Array[Vector] = { key: Vector => { val hashValues: Array[Double] = randUnitVectors.map({ randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength)) }) - Vectors.dense(hashValues) + hashValues.grouped($(numHashFunctions)).map(Vectors.dense).toArray } } @@ -89,23 +89,30 @@ class RandomProjectionModel private[ml] ( } @Since("2.1.0") - override protected[ml] def hashDistance(x: Vector, y: Vector): Double = { + override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = { // Since it's generated by hashing, it will be a pair of dense vectors. - x.toDense.values.zip(y.toDense.values).map(pair => math.abs(pair._1 - pair._2)).min + x.zip(y).map(vectorPair => Vectors.sqdist(vectorPair._1, vectorPair._2)).min + } + + @Since("2.1.0") + override protected[ml] def validateDimension(): Unit = { + require(randUnitVectors.length == ($(numHashFunctions) * $(numHashTables))) } @Since("2.1.0") override def copy(extra: ParamMap): this.type = defaultCopy(extra) @Since("2.1.0") - override def write: MLWriter = new RandomProjectionModel.RandomProjectionModelWriter(this) + override def write: MLWriter = { + new BucketedRandomProjectionModel.BucketedRandomProjectionModelWriter(this) + } } /** * :: Experimental :: * - * This [[RandomProjection]] implements Locality Sensitive Hashing functions for Euclidean - * distance metrics. + * This [[BucketedRandomProjectionLSH]] implements Locality Sensitive Hashing functions for + * Euclidean distance metrics. * * The input is dense or sparse vectors, each of which represents a point in the Euclidean * distance space. The output will be vectors of configurable dimension. Hash value in the same @@ -121,8 +128,8 @@ class RandomProjectionModel private[ml] ( */ @Experimental @Since("2.1.0") -class RandomProjection(override val uid: String) extends LSH[RandomProjectionModel] - with RandomProjectionParams with HasSeed { +class BucketedRandomProjectionLSH(override val uid: String) + extends LSH[BucketedRandomProjectionModel] with BucketedRandomProjectionParams with HasSeed { @Since("2.1.0") override def setInputCol(value: String): this.type = super.setInputCol(value) @@ -131,7 +138,10 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod override def setOutputCol(value: String): this.type = super.setOutputCol(value) @Since("2.1.0") - override def setOutputDim(value: Int): this.type = super.setOutputDim(value) + override def setNumHashFunctions(value: Int): this.type = super.setNumHashFunctions(value) + + @Since("2.1.0") + override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value) @Since("2.1.0") def this() = { @@ -147,15 +157,15 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod def setSeed(value: Long): this.type = set(seed, value) @Since("2.1.0") - override protected[this] def createRawLSHModel(inputDim: Int): RandomProjectionModel = { + override protected[this] def createRawLSHModel(inputDim: Int): BucketedRandomProjectionModel = { val rand = new Random($(seed)) val randUnitVectors: Array[Vector] = { - Array.fill($(outputDim)) { + Array.fill($(numHashTables) * $(numHashFunctions)) { val randArray = Array.fill(inputDim)(rand.nextGaussian()) Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray))) } } - new RandomProjectionModel(uid, randUnitVectors) + new BucketedRandomProjectionModel(uid, randUnitVectors) } @Since("2.1.0") @@ -169,23 +179,25 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod } @Since("2.1.0") -object RandomProjection extends DefaultParamsReadable[RandomProjection] { +object BucketedRandomProjectionLSH extends DefaultParamsReadable[BucketedRandomProjectionLSH] { @Since("2.1.0") - override def load(path: String): RandomProjection = super.load(path) + override def load(path: String): BucketedRandomProjectionLSH = super.load(path) } @Since("2.1.0") -object RandomProjectionModel extends MLReadable[RandomProjectionModel] { +object BucketedRandomProjectionModel extends MLReadable[BucketedRandomProjectionModel] { @Since("2.1.0") - override def read: MLReader[RandomProjectionModel] = new RandomProjectionModelReader + override def read: MLReader[BucketedRandomProjectionModel] = { + new BucketedRandomProjectionModelReader + } @Since("2.1.0") - override def load(path: String): RandomProjectionModel = super.load(path) + override def load(path: String): BucketedRandomProjectionModel = super.load(path) - private[RandomProjectionModel] class RandomProjectionModelWriter(instance: RandomProjectionModel) - extends MLWriter { + private[BucketedRandomProjectionModel] class BucketedRandomProjectionModelWriter( + instance: BucketedRandomProjectionModel) extends MLWriter { // TODO: Save using the existing format of Array[Vector] once SPARK-12878 is resolved. private case class Data(randUnitVectors: Matrix) @@ -203,12 +215,13 @@ object RandomProjectionModel extends MLReadable[RandomProjectionModel] { } } - private class RandomProjectionModelReader extends MLReader[RandomProjectionModel] { + private class BucketedRandomProjectionModelReader + extends MLReader[BucketedRandomProjectionModel] { /** Checked against metadata when loading model */ - private val className = classOf[RandomProjectionModel].getName + private val className = classOf[BucketedRandomProjectionModel].getName - override def load(path: String): RandomProjectionModel = { + override def load(path: String): BucketedRandomProjectionModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString @@ -216,7 +229,7 @@ object RandomProjectionModel extends MLReadable[RandomProjectionModel] { val Row(randUnitVectors: Matrix) = MLUtils.convertMatrixColumnsToML(data, "randUnitVectors") .select("randUnitVectors") .head() - val model = new RandomProjectionModel(metadata.uid, randUnitVectors.rowIter.toArray) + val model = new BucketedRandomProjectionModel(metadata.uid, randUnitVectors.rowIter.toArray) DefaultParamsReader.getAndSetParams(model, metadata) model diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index a0b2e75bdd02c..987c6c70d8809 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -35,18 +35,32 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { /** * Param for the dimension of LSH OR-amplification. * - * In this implementation, we use LSH OR-amplification to reduce the false negative rate. The - * higher the dimension is, the lower the false negative rate. + * LSH OR-amplification can be used to reduce the false negative rate. The higher the dimension + * is, the lower the false negative rate. * @group param */ - final val outputDim: IntParam = new IntParam(this, "outputDim", "output dimension, where" + + final val numHashTables: IntParam = new IntParam(this, "numHashTables", "number of hash " + + "tables, where increasing number of hash tables lowers the false negative rate, and " + + "decreasing it improves the running performance", ParamValidators.gt(0)) + + /** + * Param for the dimension of LSH AND-amplification. + * + * LSH AND-amplification can be used to reduce the false positive rate. The higher the dimension + * is, the lower the false positive rate. + * @group param + */ + final val numHashFunctions: IntParam = new IntParam(this, "numHashFunctions", "output dimension, where" + "increasing dimensionality lowers the false negative rate, and decreasing dimensionality" + " improves the running performance", ParamValidators.gt(0)) /** @group getParam */ - final def getOutputDim: Int = $(outputDim) + final def getNumHashFunctions: Int = $(numHashFunctions) - setDefault(outputDim -> 1) + /** @group getParam */ + final def getNumHashTables: Int = $(numHashTables) + + setDefault(numHashTables -> 1, numHashFunctions -> 1) /** * Transform the Schema for LSH @@ -54,7 +68,7 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { * @return A derived schema with [[outputCol]] added */ protected[this] final def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT) + SchemaUtils.appendColumn(schema, $(outputCol), DataTypes.createArrayType(new VectorUDT)) } } @@ -66,10 +80,10 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] self: T => /** - * The hash function of LSH, mapping a predefined KeyType to a Vector + * The hash function of LSH, mapping an input feature to multiple vectors * @return The mapping of LSH function. */ - protected[ml] val hashFunction: Vector => Vector + protected[ml] val hashFunction: Vector => Array[Vector] /** * Calculate the distance between two different keys using the distance metric corresponding @@ -87,11 +101,14 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * @param y Another hash vector * @return The distance between hash vectors x and y */ - protected[ml] def hashDistance(x: Vector, y: Vector): Double + protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double + + protected[ml] def validateDimension(): Unit override def transform(dataset: Dataset[_]): DataFrame = { + validateDimension() transformSchema(dataset.schema, logging = true) - val transformUDF = udf(hashFunction, new VectorUDT) + val transformUDF = udf(hashFunction, DataTypes.createArrayType(new VectorUDT)) dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol)))) } @@ -99,13 +116,6 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] validateAndTransformSchema(schema) } - /** - * Check prerequisite for nearest neighbor. This method will be overridden in subclasses. - * - * @param singleProbe True for using single-probe; false for multi-probe - */ - protected[this] def checkNearestNeighbor(singleProbe: Boolean) = {} - /** * Given a large dataset and an item, approximately find at most k items which have the closest * distance to the item. If the [[outputCol]] is missing, the method will transform the data; if @@ -116,6 +126,8 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * - Single-probe: Fast, return at most k elements (Probing only one buckets) * - Multi-probe: Slow, return exact k elements (Probing multiple buckets close to the key) * + * Currently it is made private since more discussion is needed for Multi-probe + * * @param dataset the dataset to search for nearest neighbors of the key * @param key Feature vector representing the item to search for * @param numNearestNeighbors The maximum number of nearest neighbors @@ -124,14 +136,13 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * @return A dataset containing at most k items closest to the key. A distCol is added to show * the distance between each row and the key. */ - def approxNearestNeighbors( + private[feature] def approxNearestNeighbors( dataset: Dataset[_], key: Vector, numNearestNeighbors: Int, singleProbe: Boolean, distCol: String): Dataset[_] = { require(numNearestNeighbors > 0, "The number of nearest neighbors cannot be less than 1") - checkNearestNeighbor(singleProbe) // Get Hash Value of the key val keyHash = hashFunction(key) val modelDataset: DataFrame = if (!dataset.columns.contains($(outputCol))) { @@ -140,14 +151,24 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] dataset.toDF() } - // In the origin dataset, find the hash value that is closest to the key - val hashDistUDF = udf((x: Vector) => hashDistance(x, keyHash), DataTypes.DoubleType) - val hashDistCol = hashDistUDF(col($(outputCol))) - val modelSubset = if (singleProbe) { - modelDataset.filter(hashDistCol === 0.0) + def sameBucket(x: Seq[Vector], y: Seq[Vector]): Boolean = { + x.zip(y).exists(tuple => tuple._1 == tuple._2) + } + + // In the origin dataset, find the hash value that hash the same bucket with the key + val sameBucketWithKeyUDF = udf((x: Seq[Vector]) => + sameBucket(x, keyHash), DataTypes.BooleanType) + + modelDataset.filter(sameBucketWithKeyUDF(col($(outputCol)))) } else { + // In the origin dataset, find the hash value that is closest to the key + // Limit the use of hashDist since it's controversial + val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType) + val hashDistCol = hashDistUDF(col($(outputCol))) + // Compute threshold to get exact k elements. + // TODO: SPARK-18409: Use approxQuantile to get the threshold val modelDatasetSortedByHash = modelDataset.sort(hashDistCol).limit(numNearestNeighbors) val thresholdDataset = modelDatasetSortedByHash.select(max(hashDistCol)) val hashThreshold = thresholdDataset.take(1).head.getDouble(0) @@ -163,8 +184,28 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] } /** - * Overloaded method for approxNearestNeighbors. Use Single Probing as default way to search - * nearest neighbors and "distCol" as default distCol. + * Given a large dataset and an item, approximately find at most k items which have the closest + * distance to the item. If the [[outputCol]] is missing, the method will transform the data; if + * the [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the + * transformed data when necessary. + * + * @param dataset the dataset to search for nearest neighbors of the key + * @param key Feature vector representing the item to search for + * @param numNearestNeighbors The maximum number of nearest neighbors + * @param distCol Output column for storing the distance between each result row and the key + * @return A dataset containing at most k items closest to the key. A distCol is added to show + * the distance between each row and the key. + */ + private[feature] def approxNearestNeighbors( + dataset: Dataset[_], + key: Vector, + numNearestNeighbors: Int, + distCol: String): Dataset[_] = { + approxNearestNeighbors(dataset, key, numNearestNeighbors, true, distCol) + } + + /** + * Overloaded method for approxNearestNeighbors. Use "distCol" as default distCol. */ def approxNearestNeighbors( dataset: Dataset[_], @@ -187,16 +228,13 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] inputName: String, explodeCols: Seq[String]): Dataset[_] = { require(explodeCols.size == 2, "explodeCols must be two strings.") - val vectorToMap = udf((x: Vector) => x.asBreeze.iterator.toMap, - MapType(DataTypes.IntegerType, DataTypes.DoubleType)) val modelDataset: DataFrame = if (!dataset.columns.contains($(outputCol))) { transform(dataset) } else { dataset.toDF() } modelDataset.select( - struct(col("*")).as(inputName), - explode(vectorToMap(col($(outputCol)))).as(explodeCols)) + struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols)) } /** @@ -301,7 +339,10 @@ private[ml] abstract class LSH[T <: LSHModel[T]] def setOutputCol(value: String): this.type = set(outputCol, value) /** @group setParam */ - def setOutputDim(value: Int): this.type = set(outputDim, value) + def setNumHashFunctions(value: Int): this.type = set(numHashFunctions, value) + + /** @group setParam */ + def setNumHashTables(value: Int): this.type = set(numHashTables, value) /** * Validate and create a new instance of concrete LSHModel. Because different LSHModel may have diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala similarity index 79% rename from mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala rename to mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 8b320c5bbb77f..34a5bb551527e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHash.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType /** * :: Experimental :: * - * Model produced by [[MinHash]], where multiple hash functions are stored. Each hash function is + * Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function is * a perfect hash function for a specific set `S` with cardinality equal to a half of `numEntries`: * `h_i(x) = ((x \cdot k_i) \mod prime) \mod numEntries` * @@ -43,20 +43,21 @@ import org.apache.spark.sql.types.StructType class MinHashModel private[ml] ( override val uid: String, @Since("2.1.0") val numEntries: Int, - @Since("2.1.0") val randCoefficients: Array[Int]) + @Since("2.1.0") private[ml] val randCoefficients: Array[Int]) extends LSHModel[MinHashModel] { @Since("2.1.0") - override protected[ml] val hashFunction: Vector => Vector = { - elems: Vector => + override protected[ml] val hashFunction: Vector => Array[Vector] = { + elems: Vector => { require(elems.numNonzeros > 0, "Must have at least 1 non zero entry.") val elemsList = elems.toSparse.indices.toList val hashValues = randCoefficients.map({ randCoefficient: Int => - elemsList.map({elem: Int => - (1 + elem) * randCoefficient.toLong % MinHash.prime % numEntries - }).min.toDouble + elemsList.map({ elem: Int => + (1 + elem) * randCoefficient.toLong % MinHashLSH.prime % numEntries + }).min.toDouble }) - Vectors.dense(hashValues) + hashValues.grouped($(numHashFunctions)).map(Vectors.dense).toArray + } } @Since("2.1.0") @@ -70,21 +71,17 @@ class MinHashModel private[ml] ( } @Since("2.1.0") - override protected[ml] def hashDistance(x: Vector, y: Vector): Double = { + override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = { // Since it's generated by hashing, it will be a pair of dense vectors. - if (x.toDense.values.zip(y.toDense.values).exists(pair => pair._1 == pair._2)) { - 0 - } else { - 1 - } + // TODO: This hashDistance function is controversial. Requires more discussion. + x.zip(y).map(vectorPair => + vectorPair._1.toArray.zip(vectorPair._2.toArray).count(pair => pair._1 != pair._2) + ).min } @Since("2.1.0") - override protected[this] def checkNearestNeighbor(singleProbe: Boolean) = { - if (!singleProbe) { - log.warn("Multi-probe for MinHash will run brute force nearest neighbor when there " + - "aren't enough candidates.") - } + override protected[ml] def validateDimension(): Unit = { + require(randCoefficients.length == ($(numHashFunctions) * $(numHashTables))) } @Since("2.1.0") @@ -110,8 +107,7 @@ class MinHashModel private[ml] ( */ @Experimental @Since("2.1.0") -class MinHash(override val uid: String) extends LSH[MinHashModel] with HasSeed { - +class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSeed { @Since("2.1.0") override def setInputCol(value: String): this.type = super.setInputCol(value) @@ -120,7 +116,10 @@ class MinHash(override val uid: String) extends LSH[MinHashModel] with HasSeed { override def setOutputCol(value: String): this.type = super.setOutputCol(value) @Since("2.1.0") - override def setOutputDim(value: Int): this.type = super.setOutputDim(value) + override def setNumHashFunctions(value: Int): this.type = super.setNumHashFunctions(value) + + @Since("2.1.0") + override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value) @Since("2.1.0") def this() = { @@ -133,11 +132,12 @@ class MinHash(override val uid: String) extends LSH[MinHashModel] with HasSeed { @Since("2.1.0") override protected[ml] def createRawLSHModel(inputDim: Int): MinHashModel = { - require(inputDim <= MinHash.prime / 2, - s"The input vector dimension $inputDim exceeds the threshold ${MinHash.prime / 2}.") + require(inputDim <= MinHashLSH.prime / 2, + s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.prime / 2}.") val rand = new Random($(seed)) val numEntry = inputDim * 2 - val randCoofs: Array[Int] = Array.fill($(outputDim))(1 + rand.nextInt(MinHash.prime - 1)) + val randCoofs: Array[Int] = Array + .fill($(numHashTables) * $(numHashFunctions))(1 + rand.nextInt(MinHashLSH.prime - 1)) new MinHashModel(uid, numEntry, randCoofs) } @@ -152,12 +152,12 @@ class MinHash(override val uid: String) extends LSH[MinHashModel] with HasSeed { } @Since("2.1.0") -object MinHash extends DefaultParamsReadable[MinHash] { +object MinHashLSH extends DefaultParamsReadable[MinHashLSH] { // A large prime smaller than sqrt(2^63 − 1) private[ml] val prime = 2038074743 @Since("2.1.0") - override def load(path: String): MinHash = super.load(path) + override def load(path: String): MinHashLSH = super.load(path) } @Since("2.1.0") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala similarity index 68% rename from mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala rename to mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 07f95527fcfde..0bee62972c482 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RandomProjectionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Dataset -class RandomProjectionSuite +class BucketedRandomProjectionLSHSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @transient var dataset: Dataset[_] = _ @@ -43,70 +43,76 @@ class RandomProjectionSuite } test("params") { - ParamsSuite.checkParams(new RandomProjection) - val model = new RandomProjectionModel("rp", randUnitVectors = Array(Vectors.dense(1.0, 0.0))) + ParamsSuite.checkParams(new BucketedRandomProjectionLSH) + val model = new BucketedRandomProjectionModel( + "brp", randUnitVectors = Array(Vectors.dense(1.0, 0.0))) ParamsSuite.checkParams(model) } - test("RandomProjection: default params") { - val rp = new RandomProjection - assert(rp.getOutputDim === 1.0) + test("BucketedRandomProjectionLSH: default params") { + val brp = new BucketedRandomProjectionLSH + assert(brp.getNumHashFunctions === 1.0) + assert(brp.getNumHashTables === 1.0) } test("read/write") { - def checkModelData(model: RandomProjectionModel, model2: RandomProjectionModel): Unit = { + def checkModelData( + model: BucketedRandomProjectionModel, + model2: BucketedRandomProjectionModel + ): Unit = { model.randUnitVectors.zip(model2.randUnitVectors) .foreach(pair => assert(pair._1 === pair._2)) } - val mh = new RandomProjection() + val mh = new BucketedRandomProjectionLSH() val settings = Map("inputCol" -> "keys", "outputCol" -> "values", "bucketLength" -> 1.0) testEstimatorAndModelReadWrite(mh, dataset, settings, checkModelData) } test("hashFunction") { - val randUnitVectors = Array(Vectors.dense(0.0, 1.0), Vectors.dense(1.0, 0.0)) - val model = new RandomProjectionModel("rp", randUnitVectors) + val randUnitVectors = Array(Vectors.dense(0.0, 1.0), + Vectors.dense(1.0, 0.0), Vectors.dense(0.25, 0.75), Vectors.dense(0.75, 0.25)) + val model = new BucketedRandomProjectionModel("brp", randUnitVectors) model.set(model.bucketLength, 0.5) + model.set(model.numHashTables, 2) + model.set(model.numHashFunctions, 2) val res = model.hashFunction(Vectors.dense(1.23, 4.56)) - assert(res.equals(Vectors.dense(9.0, 2.0))) + assert(res(0).equals(Vectors.dense(9.0, 2.0))) } - test("keyDistance and hashDistance") { - val model = new RandomProjectionModel("rp", Array(Vectors.dense(0.0, 1.0))) + test("keyDistance") { + val model = new BucketedRandomProjectionModel("brp", Array(Vectors.dense(0.0, 1.0))) val keyDist = model.keyDistance(Vectors.dense(1, 2), Vectors.dense(-2, -2)) - val hashDist = model.hashDistance(Vectors.dense(-5, 5), Vectors.dense(1, 2)) assert(keyDist === 5) - assert(hashDist === 3) } - test("RandomProjection: randUnitVectors") { - val rp = new RandomProjection() - .setOutputDim(20) + test("BucketedRandomProjectionLSH: randUnitVectors") { + val brp = new BucketedRandomProjectionLSH() + .setNumHashFunctions(10) + .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") .setBucketLength(1.0) .setSeed(12345) - val unitVectors = rp.fit(dataset).randUnitVectors + val unitVectors = brp.fit(dataset).randUnitVectors unitVectors.foreach { v: Vector => assert(Vectors.norm(v, 2.0) ~== 1.0 absTol 1e-14) } } - test("RandomProjection: test of LSH property") { + test("BucketedRandomProjectionLSH: test of LSH property") { // Project from 2 dimensional Euclidean Space to 1 dimensions - val rp = new RandomProjection() - .setOutputDim(1) + val brp = new BucketedRandomProjectionLSH() .setInputCol("keys") .setOutputCol("values") .setBucketLength(1.0) .setSeed(12345) - val (falsePositive, falseNegative) = LSHTest.calculateLSHProperty(dataset, rp, 8.0, 2.0) + val (falsePositive, falseNegative) = LSHTest.calculateLSHProperty(dataset, brp, 8.0, 2.0) assert(falsePositive < 0.4) assert(falseNegative < 0.4) } - test("RandomProjection with high dimension data: test of LSH property") { + test("BucketedRandomProjectionLSH with high dimension data: test of LSH property") { val numDim = 100 val data = { for (i <- 0 until numDim; j <- Seq(-2, -1, 1, 2)) @@ -115,29 +121,30 @@ class RandomProjectionSuite val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") // Project from 100 dimensional Euclidean Space to 10 dimensions - val rp = new RandomProjection() - .setOutputDim(10) + val brp = new BucketedRandomProjectionLSH() + .setNumHashFunctions(5) + .setNumHashTables(10) .setInputCol("keys") .setOutputCol("values") .setBucketLength(2.5) .setSeed(12345) - val (falsePositive, falseNegative) = LSHTest.calculateLSHProperty(df, rp, 3.0, 2.0) + val (falsePositive, falseNegative) = LSHTest.calculateLSHProperty(df, brp, 3.0, 2.0) assert(falsePositive < 0.3) assert(falseNegative < 0.3) } - test("approxNearestNeighbors for random projection") { + test("approxNearestNeighbors for bucketed random projection") { val key = Vectors.dense(1.2, 3.4) - val rp = new RandomProjection() - .setOutputDim(2) + val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") .setBucketLength(4.0) .setSeed(12345) - val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(rp, dataset, key, 100, + val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(brp, dataset, key, 100, singleProbe = true) assert(precision >= 0.6) assert(recall >= 0.6) @@ -146,33 +153,34 @@ class RandomProjectionSuite test("approxNearestNeighbors with multiple probing") { val key = Vectors.dense(1.2, 3.4) - val rp = new RandomProjection() - .setOutputDim(20) + val brp = new BucketedRandomProjectionLSH() + .setNumHashFunctions(10) + .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") .setBucketLength(1.0) .setSeed(12345) - val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(rp, dataset, key, 100, + val (precision, recall) = LSHTest.calculateApproxNearestNeighbors(brp, dataset, key, 100, singleProbe = false) assert(precision >= 0.7) assert(recall >= 0.7) } - test("approxSimilarityJoin for random projection on different dataset") { + test("approxSimilarityJoin for bucketed random projection on different dataset") { val data2 = { for (i <- 0 until 24) yield Vectors.dense(10 * sin(Pi / 12 * i), 10 * cos(Pi / 12 * i)) } val dataset2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("keys") - val rp = new RandomProjection() - .setOutputDim(2) + val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") .setBucketLength(4.0) .setSeed(12345) - val (precision, recall) = LSHTest.calculateApproxSimilarityJoin(rp, dataset, dataset2, 1.0) + val (precision, recall) = LSHTest.calculateApproxSimilarityJoin(brp, dataset, dataset2, 1.0) assert(precision == 1.0) assert(recall >= 0.7) } @@ -183,14 +191,14 @@ class RandomProjectionSuite } val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") - val rp = new RandomProjection() - .setOutputDim(2) + val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(2) .setInputCol("keys") .setOutputCol("values") .setBucketLength(4.0) .setSeed(12345) - val (precision, recall) = LSHTest.calculateApproxSimilarityJoin(rp, df, df, 3.0) + val (precision, recall) = LSHTest.calculateApproxSimilarityJoin(brp, df, df, 3.0) assert(precision == 1.0) assert(recall >= 0.7) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala index f7ded60fb6542..85b2c6c1cf8f3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala @@ -58,12 +58,13 @@ private[ml] object LSHTest { val outputCol = model.getOutputCol val transformedData = model.transform(dataset) - SchemaUtils.checkColumnType(transformedData.schema, model.getOutputCol, new VectorUDT) + SchemaUtils.checkColumnType( + transformedData.schema, model.getOutputCol, DataTypes.createArrayType(new VectorUDT)) // Perform a cross join and label each pair of same_bucket and distance val pairs = transformedData.as("a").crossJoin(transformedData.as("b")) val distUDF = udf((x: Vector, y: Vector) => model.keyDistance(x, y), DataTypes.DoubleType) - val sameBucket = udf((x: Vector, y: Vector) => model.hashDistance(x, y) == 0.0, + val sameBucket = udf((x: Seq[Vector], y: Seq[Vector]) => model.hashDistance(x, y) == 0.0, DataTypes.BooleanType) val result = pairs .withColumn("same_bucket", sameBucket(col(s"a.$outputCol"), col(s"b.$outputCol"))) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala similarity index 81% rename from mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala rename to mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index d05f693cc961f..cb3f486070140 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.ml.util.DefaultReadWriteTest import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.Dataset -class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @transient var dataset: Dataset[_] = _ @@ -38,14 +38,15 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default } test("params") { - ParamsSuite.checkParams(new MinHash) + ParamsSuite.checkParams(new MinHashLSH) val model = new MinHashModel("mh", numEntries = 2, randCoefficients = Array(1)) ParamsSuite.checkParams(model) } - test("MinHash: default params") { - val rp = new MinHash - assert(rp.getOutputDim === 1.0) + test("MinHashLSH: default params") { + val rp = new MinHashLSH + assert(rp.getNumHashTables === 1.0) + assert(rp.getNumHashFunctions === 1.0) } test("read/write") { @@ -53,32 +54,30 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default assert(model.numEntries === model2.numEntries) assertResult(model.randCoefficients)(model2.randCoefficients) } - val mh = new MinHash() + val mh = new MinHashLSH() val settings = Map("inputCol" -> "keys", "outputCol" -> "values") testEstimatorAndModelReadWrite(mh, dataset, settings, checkModelData) } test("hashFunction") { - val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(0, 1, 3)) + val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(0, 1, 2, 3)) + model.set(model.numHashTables, 2) + model.set(model.numHashFunctions, 2) val res = model.hashFunction(Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0)))) - assert(res.equals(Vectors.dense(0.0, 3.0, 4.0))) + assert(res(0).equals(Vectors.dense(0.0, 3.0))) + assert(res(1).equals(Vectors.dense(6.0, 4.0))) } - test("keyDistance and hashDistance") { + test("keyDistance") { val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(1)) val v1 = Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0))) val v2 = Vectors.sparse(10, Seq((1, 1.0), (3, 1.0), (5, 1.0), (7, 1.0), (9, 1.0))) val keyDist = model.keyDistance(v1, v2) - val hashDist1 = model.hashDistance(Vectors.dense(1, 2), Vectors.dense(3, 4)) - val hashDist2 = model.hashDistance(Vectors.dense(1, 2), Vectors.dense(3, 2)) assert(keyDist === 0.5) - assert(hashDist1 === 1.0) - assert(hashDist2 === 0.0) } - test("MinHash: test of LSH property") { - val mh = new MinHash() - .setOutputDim(1) + test("MinHashLSH: test of LSH property") { + val mh = new MinHashLSH() .setInputCol("keys") .setOutputCol("values") .setSeed(12344) @@ -89,8 +88,8 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default } test("approxNearestNeighbors for min hash") { - val mh = new MinHash() - .setOutputDim(20) + val mh = new MinHashLSH() + .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") .setSeed(12345) @@ -104,7 +103,7 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default assert(recall >= 0.7) } - test("approxSimilarityJoin for minhash on different dataset") { + test("approxSimilarityJoin for min hash on different dataset") { val data1 = { for (i <- 0 until 20) yield Vectors.sparse(100, (5 * i until 5 * i + 5).map((_, 1.0))) } @@ -115,8 +114,9 @@ class MinHashSuite extends SparkFunSuite with MLlibTestSparkContext with Default } val df2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("keys") - val mh = new MinHash() - .setOutputDim(20) + val mh = new MinHashLSH() + .setNumHashFunctions(2) + .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") .setSeed(12345) From 0e9250be0142691e9e085ed1260f83f8ed40f5e4 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Sun, 13 Nov 2016 20:38:44 -0800 Subject: [PATCH 07/20] (1) Fix description for numHashFunctions (2) Make numEntries in MinHash private --- mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala | 6 +++--- .../main/scala/org/apache/spark/ml/feature/MinHashLSH.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 987c6c70d8809..c9d4e41296328 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -50,9 +50,9 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { * is, the lower the false positive rate. * @group param */ - final val numHashFunctions: IntParam = new IntParam(this, "numHashFunctions", "output dimension, where" + - "increasing dimensionality lowers the false negative rate, and decreasing dimensionality" + - " improves the running performance", ParamValidators.gt(0)) + final val numHashFunctions: IntParam = new IntParam(this, "numHashFunctions", "number of hash " + + "functions in each hash table, where increasing the number improves the running performance, " + + "and decreasing it raises the false negative rate", ParamValidators.gt(0)) /** @group getParam */ final def getNumHashFunctions: Int = $(numHashFunctions) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 34a5bb551527e..e37ff187a1641 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.types.StructType @Since("2.1.0") class MinHashModel private[ml] ( override val uid: String, - @Since("2.1.0") val numEntries: Int, + @Since("2.1.0") private[ml] val numEntries: Int, @Since("2.1.0") private[ml] val randCoefficients: Array[Int]) extends LSHModel[MinHashModel] { From adbbefe1777db8fb85a0af59c11e5840d3bc91ee Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Sun, 13 Nov 2016 20:43:30 -0800 Subject: [PATCH 08/20] Add assertion for hashFunction in BucketedRandomProjectionLSHSuite --- .../spark/ml/feature/BucketedRandomProjectionLSHSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 0bee62972c482..0586557e2bb4a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -77,6 +77,7 @@ class BucketedRandomProjectionLSHSuite model.set(model.numHashFunctions, 2) val res = model.hashFunction(Vectors.dense(1.23, 4.56)) assert(res(0).equals(Vectors.dense(9.0, 2.0))) + assert(res(1).equals(Vectors.dense(7.0, 4.0))) } test("keyDistance") { From c115ed3e580d4999c85d02417356e66f8f360bda Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Mon, 14 Nov 2016 09:46:55 -0800 Subject: [PATCH 09/20] Revert AND-amplification for a future PR --- .../feature/BucketedRandomProjectionLSH.scala | 13 +++-------- .../org/apache/spark/ml/feature/LSH.scala | 22 +------------------ .../apache/spark/ml/feature/MinHashLSH.scala | 14 +++--------- .../BucketedRandomProjectionLSHSuite.scala | 13 +++-------- .../spark/ml/feature/MinHashLSHSuite.scala | 11 ++++------ 5 files changed, 14 insertions(+), 59 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index 7835ddfdf8763..798eeba95138a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -79,7 +79,8 @@ class BucketedRandomProjectionModel private[ml]( val hashValues: Array[Double] = randUnitVectors.map({ randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength)) }) - hashValues.grouped($(numHashFunctions)).map(Vectors.dense).toArray + // TODO: For AND-amplification, output vectors of dimension numHashFunctions + hashValues.grouped(1).map(Vectors.dense).toArray } } @@ -94,11 +95,6 @@ class BucketedRandomProjectionModel private[ml]( x.zip(y).map(vectorPair => Vectors.sqdist(vectorPair._1, vectorPair._2)).min } - @Since("2.1.0") - override protected[ml] def validateDimension(): Unit = { - require(randUnitVectors.length == ($(numHashFunctions) * $(numHashTables))) - } - @Since("2.1.0") override def copy(extra: ParamMap): this.type = defaultCopy(extra) @@ -137,9 +133,6 @@ class BucketedRandomProjectionLSH(override val uid: String) @Since("2.1.0") override def setOutputCol(value: String): this.type = super.setOutputCol(value) - @Since("2.1.0") - override def setNumHashFunctions(value: Int): this.type = super.setNumHashFunctions(value) - @Since("2.1.0") override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value) @@ -160,7 +153,7 @@ class BucketedRandomProjectionLSH(override val uid: String) override protected[this] def createRawLSHModel(inputDim: Int): BucketedRandomProjectionModel = { val rand = new Random($(seed)) val randUnitVectors: Array[Vector] = { - Array.fill($(numHashTables) * $(numHashFunctions)) { + Array.fill($(numHashTables)) { val randArray = Array.fill(inputDim)(rand.nextGaussian()) Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray))) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index c9d4e41296328..5a42373b66d35 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -43,24 +43,10 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { "tables, where increasing number of hash tables lowers the false negative rate, and " + "decreasing it improves the running performance", ParamValidators.gt(0)) - /** - * Param for the dimension of LSH AND-amplification. - * - * LSH AND-amplification can be used to reduce the false positive rate. The higher the dimension - * is, the lower the false positive rate. - * @group param - */ - final val numHashFunctions: IntParam = new IntParam(this, "numHashFunctions", "number of hash " + - "functions in each hash table, where increasing the number improves the running performance, " + - "and decreasing it raises the false negative rate", ParamValidators.gt(0)) - - /** @group getParam */ - final def getNumHashFunctions: Int = $(numHashFunctions) - /** @group getParam */ final def getNumHashTables: Int = $(numHashTables) - setDefault(numHashTables -> 1, numHashFunctions -> 1) + setDefault(numHashTables -> 1) /** * Transform the Schema for LSH @@ -103,10 +89,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] */ protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double - protected[ml] def validateDimension(): Unit - override def transform(dataset: Dataset[_]): DataFrame = { - validateDimension() transformSchema(dataset.schema, logging = true) val transformUDF = udf(hashFunction, DataTypes.createArrayType(new VectorUDT)) dataset.withColumn($(outputCol), transformUDF(dataset($(inputCol)))) @@ -338,9 +321,6 @@ private[ml] abstract class LSH[T <: LSHModel[T]] /** @group setParam */ def setOutputCol(value: String): this.type = set(outputCol, value) - /** @group setParam */ - def setNumHashFunctions(value: Int): this.type = set(numHashFunctions, value) - /** @group setParam */ def setNumHashTables(value: Int): this.type = set(numHashTables, value) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index e37ff187a1641..ce81182de2436 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -56,7 +56,8 @@ class MinHashModel private[ml] ( (1 + elem) * randCoefficient.toLong % MinHashLSH.prime % numEntries }).min.toDouble }) - hashValues.grouped($(numHashFunctions)).map(Vectors.dense).toArray + // TODO: For AND-amplification, output vectors of dimension numHashFunctions + hashValues.grouped(1).map(Vectors.dense).toArray } } @@ -79,11 +80,6 @@ class MinHashModel private[ml] ( ).min } - @Since("2.1.0") - override protected[ml] def validateDimension(): Unit = { - require(randCoefficients.length == ($(numHashFunctions) * $(numHashTables))) - } - @Since("2.1.0") override def copy(extra: ParamMap): this.type = defaultCopy(extra) @@ -115,9 +111,6 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSee @Since("2.1.0") override def setOutputCol(value: String): this.type = super.setOutputCol(value) - @Since("2.1.0") - override def setNumHashFunctions(value: Int): this.type = super.setNumHashFunctions(value) - @Since("2.1.0") override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value) @@ -136,8 +129,7 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSee s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.prime / 2}.") val rand = new Random($(seed)) val numEntry = inputDim * 2 - val randCoofs: Array[Int] = Array - .fill($(numHashTables) * $(numHashFunctions))(1 + rand.nextInt(MinHashLSH.prime - 1)) + val randCoofs: Array[Int] = Array.fill($(numHashTables))(1 + rand.nextInt(MinHashLSH.prime - 1)) new MinHashModel(uid, numEntry, randCoofs) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 0586557e2bb4a..0d7d59845bb65 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -51,7 +51,6 @@ class BucketedRandomProjectionLSHSuite test("BucketedRandomProjectionLSH: default params") { val brp = new BucketedRandomProjectionLSH - assert(brp.getNumHashFunctions === 1.0) assert(brp.getNumHashTables === 1.0) } @@ -69,15 +68,12 @@ class BucketedRandomProjectionLSHSuite } test("hashFunction") { - val randUnitVectors = Array(Vectors.dense(0.0, 1.0), - Vectors.dense(1.0, 0.0), Vectors.dense(0.25, 0.75), Vectors.dense(0.75, 0.25)) + val randUnitVectors = Array(Vectors.dense(0.0, 1.0), Vectors.dense(1.0, 0.0)) val model = new BucketedRandomProjectionModel("brp", randUnitVectors) model.set(model.bucketLength, 0.5) - model.set(model.numHashTables, 2) - model.set(model.numHashFunctions, 2) val res = model.hashFunction(Vectors.dense(1.23, 4.56)) - assert(res(0).equals(Vectors.dense(9.0, 2.0))) - assert(res(1).equals(Vectors.dense(7.0, 4.0))) + assert(res(0).equals(Vectors.dense(9.0))) + assert(res(1).equals(Vectors.dense(2.0))) } test("keyDistance") { @@ -88,7 +84,6 @@ class BucketedRandomProjectionLSHSuite test("BucketedRandomProjectionLSH: randUnitVectors") { val brp = new BucketedRandomProjectionLSH() - .setNumHashFunctions(10) .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") @@ -123,7 +118,6 @@ class BucketedRandomProjectionLSHSuite // Project from 100 dimensional Euclidean Space to 10 dimensions val brp = new BucketedRandomProjectionLSH() - .setNumHashFunctions(5) .setNumHashTables(10) .setInputCol("keys") .setOutputCol("values") @@ -155,7 +149,6 @@ class BucketedRandomProjectionLSHSuite val key = Vectors.dense(1.2, 3.4) val brp = new BucketedRandomProjectionLSH() - .setNumHashFunctions(10) .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index cb3f486070140..0ae761b5e24b5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -46,7 +46,6 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa test("MinHashLSH: default params") { val rp = new MinHashLSH assert(rp.getNumHashTables === 1.0) - assert(rp.getNumHashFunctions === 1.0) } test("read/write") { @@ -60,12 +59,11 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } test("hashFunction") { - val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(0, 1, 2, 3)) - model.set(model.numHashTables, 2) - model.set(model.numHashFunctions, 2) + val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(0, 1, 3)) val res = model.hashFunction(Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0)))) - assert(res(0).equals(Vectors.dense(0.0, 3.0))) - assert(res(1).equals(Vectors.dense(6.0, 4.0))) + assert(res(0).equals(Vectors.dense(0.0))) + assert(res(1).equals(Vectors.dense(3.0))) + assert(res(2).equals(Vectors.dense(4.0))) } test("keyDistance") { @@ -115,7 +113,6 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val df2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("keys") val mh = new MinHashLSH() - .setNumHashFunctions(2) .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") From 033ae5db1092ab2cd426f974c3e8de594461ca20 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 15 Nov 2016 14:17:00 -0800 Subject: [PATCH 10/20] Code Review Comments --- .../feature/BucketedRandomProjectionLSH.scala | 50 ++++++------ .../org/apache/spark/ml/feature/LSH.scala | 31 ++------ .../apache/spark/ml/feature/MinHashLSH.scala | 77 ++++++++++--------- .../BucketedRandomProjectionLSHSuite.scala | 29 +++++-- .../spark/ml/feature/MinHashLSHSuite.scala | 55 +++++++++++-- 5 files changed, 147 insertions(+), 95 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index 798eeba95138a..c3c71032112b8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType * * Params for [[BucketedRandomProjectionLSH]]. */ -private[ml] trait BucketedRandomProjectionParams extends Params { +private[ml] trait BucketedRandomProjectionLSHParams extends Params { /** * The length of each hash bucket, a larger bucket lowers the false negative rate. The number of @@ -68,10 +68,10 @@ private[ml] trait BucketedRandomProjectionParams extends Params { */ @Experimental @Since("2.1.0") -class BucketedRandomProjectionModel private[ml]( +class BucketedRandomProjectionLSHModel private[ml]( override val uid: String, - @Since("2.1.0") private[ml] val randUnitVectors: Array[Vector]) - extends LSHModel[BucketedRandomProjectionModel] with BucketedRandomProjectionParams { + private[ml] val randUnitVectors: Array[Vector]) + extends LSHModel[BucketedRandomProjectionLSHModel] with BucketedRandomProjectionLSHParams { @Since("2.1.0") override protected[ml] val hashFunction: Vector => Array[Vector] = { @@ -79,7 +79,7 @@ class BucketedRandomProjectionModel private[ml]( val hashValues: Array[Double] = randUnitVectors.map({ randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength)) }) - // TODO: For AND-amplification, output vectors of dimension numHashFunctions + // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 hashValues.grouped(1).map(Vectors.dense).toArray } } @@ -100,7 +100,7 @@ class BucketedRandomProjectionModel private[ml]( @Since("2.1.0") override def write: MLWriter = { - new BucketedRandomProjectionModel.BucketedRandomProjectionModelWriter(this) + new BucketedRandomProjectionLSHModel.BucketedRandomProjectionLSHModelWriter(this) } } @@ -111,8 +111,8 @@ class BucketedRandomProjectionModel private[ml]( * Euclidean distance metrics. * * The input is dense or sparse vectors, each of which represents a point in the Euclidean - * distance space. The output will be vectors of configurable dimension. Hash value in the same - * dimension is calculated by the same hash function. + * distance space. The output will be vectors of configurable dimension. Hash values in the + * same dimension are calculated by the same hash function. * * References: * @@ -125,7 +125,8 @@ class BucketedRandomProjectionModel private[ml]( @Experimental @Since("2.1.0") class BucketedRandomProjectionLSH(override val uid: String) - extends LSH[BucketedRandomProjectionModel] with BucketedRandomProjectionParams with HasSeed { + extends LSH[BucketedRandomProjectionLSHModel] + with BucketedRandomProjectionLSHParams with HasSeed { @Since("2.1.0") override def setInputCol(value: String): this.type = super.setInputCol(value) @@ -138,7 +139,7 @@ class BucketedRandomProjectionLSH(override val uid: String) @Since("2.1.0") def this() = { - this(Identifiable.randomUID("random projection")) + this(Identifiable.randomUID("brp-lsh")) } /** @group setParam */ @@ -150,7 +151,9 @@ class BucketedRandomProjectionLSH(override val uid: String) def setSeed(value: Long): this.type = set(seed, value) @Since("2.1.0") - override protected[this] def createRawLSHModel(inputDim: Int): BucketedRandomProjectionModel = { + override protected[this] def createRawLSHModel( + inputDim: Int + ): BucketedRandomProjectionLSHModel = { val rand = new Random($(seed)) val randUnitVectors: Array[Vector] = { Array.fill($(numHashTables)) { @@ -158,7 +161,7 @@ class BucketedRandomProjectionLSH(override val uid: String) Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray))) } } - new BucketedRandomProjectionModel(uid, randUnitVectors) + new BucketedRandomProjectionLSHModel(uid, randUnitVectors) } @Since("2.1.0") @@ -179,18 +182,18 @@ object BucketedRandomProjectionLSH extends DefaultParamsReadable[BucketedRandomP } @Since("2.1.0") -object BucketedRandomProjectionModel extends MLReadable[BucketedRandomProjectionModel] { +object BucketedRandomProjectionLSHModel extends MLReadable[BucketedRandomProjectionLSHModel] { @Since("2.1.0") - override def read: MLReader[BucketedRandomProjectionModel] = { - new BucketedRandomProjectionModelReader + override def read: MLReader[BucketedRandomProjectionLSHModel] = { + new BucketedRandomProjectionLSHModelReader } @Since("2.1.0") - override def load(path: String): BucketedRandomProjectionModel = super.load(path) + override def load(path: String): BucketedRandomProjectionLSHModel = super.load(path) - private[BucketedRandomProjectionModel] class BucketedRandomProjectionModelWriter( - instance: BucketedRandomProjectionModel) extends MLWriter { + private[BucketedRandomProjectionLSHModel] class BucketedRandomProjectionLSHModelWriter( + instance: BucketedRandomProjectionLSHModel) extends MLWriter { // TODO: Save using the existing format of Array[Vector] once SPARK-12878 is resolved. private case class Data(randUnitVectors: Matrix) @@ -208,13 +211,13 @@ object BucketedRandomProjectionModel extends MLReadable[BucketedRandomProjection } } - private class BucketedRandomProjectionModelReader - extends MLReader[BucketedRandomProjectionModel] { + private class BucketedRandomProjectionLSHModelReader + extends MLReader[BucketedRandomProjectionLSHModel] { /** Checked against metadata when loading model */ - private val className = classOf[BucketedRandomProjectionModel].getName + private val className = classOf[BucketedRandomProjectionLSHModel].getName - override def load(path: String): BucketedRandomProjectionModel = { + override def load(path: String): BucketedRandomProjectionLSHModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString @@ -222,7 +225,8 @@ object BucketedRandomProjectionModel extends MLReadable[BucketedRandomProjection val Row(randUnitVectors: Matrix) = MLUtils.convertMatrixColumnsToML(data, "randUnitVectors") .select("randUnitVectors") .head() - val model = new BucketedRandomProjectionModel(metadata.uid, randUnitVectors.rowIter.toArray) + val model = new BucketedRandomProjectionLSHModel(metadata.uid, + randUnitVectors.rowIter.toArray) DefaultParamsReader.getAndSetParams(model, metadata) model diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 5a42373b66d35..6b822d490eb5a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -33,10 +33,10 @@ import org.apache.spark.sql.types._ */ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { /** - * Param for the dimension of LSH OR-amplification. + * Param for the number of hash tables used in LSH OR-amplification. * - * LSH OR-amplification can be used to reduce the false negative rate. The higher the dimension - * is, the lower the false negative rate. + * LSH OR-amplification can be used to reduce the false negative rate. Higher values for this + * param lead to a reduced false negative rate, at the expense of added computational complexity. * @group param */ final val numHashTables: IntParam = new IntParam(this, "numHashTables", "number of hash " + @@ -66,7 +66,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] self: T => /** - * The hash function of LSH, mapping an input feature to multiple vectors + * The hash function of LSH, mapping an input feature vector to multiple hash vectors. * @return The mapping of LSH function. */ protected[ml] val hashFunction: Vector => Array[Vector] @@ -99,26 +99,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] validateAndTransformSchema(schema) } - /** - * Given a large dataset and an item, approximately find at most k items which have the closest - * distance to the item. If the [[outputCol]] is missing, the method will transform the data; if - * the [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the - * transformed data when necessary. - * - * This method implements two ways of fetching k nearest neighbors: - * - Single-probe: Fast, return at most k elements (Probing only one buckets) - * - Multi-probe: Slow, return exact k elements (Probing multiple buckets close to the key) - * - * Currently it is made private since more discussion is needed for Multi-probe - * - * @param dataset the dataset to search for nearest neighbors of the key - * @param key Feature vector representing the item to search for - * @param numNearestNeighbors The maximum number of nearest neighbors - * @param singleProbe True for using single-probe; false for multi-probe - * @param distCol Output column for storing the distance between each result row and the key - * @return A dataset containing at most k items closest to the key. A distCol is added to show - * the distance between each row and the key. - */ + // TODO: Fix the MultiProbe NN Search in SPARK-18454 private[feature] def approxNearestNeighbors( dataset: Dataset[_], key: Vector, @@ -179,7 +160,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * @return A dataset containing at most k items closest to the key. A distCol is added to show * the distance between each row and the key. */ - private[feature] def approxNearestNeighbors( + def approxNearestNeighbors( dataset: Dataset[_], key: Vector, numNearestNeighbors: Int, diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index ce81182de2436..370a3a79da924 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -32,31 +32,31 @@ import org.apache.spark.sql.types.StructType * :: Experimental :: * * Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function is - * a perfect hash function for a specific set `S` with cardinality equal to a half of `numEntries`: - * `h_i(x) = ((x \cdot k_i) \mod prime) \mod numEntries` + * a perfect hash function for a specific set `S` with cardinality equal to `numEntries`: + * `h_i(x) = ((x \cdot a_i + b_i) \mod prime) \mod numEntries` * * @param numEntries The number of entries of the hash functions. * @param randCoefficients An array of random coefficients, each used by one hash function. */ @Experimental @Since("2.1.0") -class MinHashModel private[ml] ( +class MinHashLSHModel private[ml]( override val uid: String, - @Since("2.1.0") private[ml] val numEntries: Int, - @Since("2.1.0") private[ml] val randCoefficients: Array[Int]) - extends LSHModel[MinHashModel] { + private[ml] val numEntries: Int, + private[ml] val randCoefficients: Array[(Int, Int)]) + extends LSHModel[MinHashLSHModel] { @Since("2.1.0") override protected[ml] val hashFunction: Vector => Array[Vector] = { elems: Vector => { require(elems.numNonzeros > 0, "Must have at least 1 non zero entry.") val elemsList = elems.toSparse.indices.toList - val hashValues = randCoefficients.map({ randCoefficient: Int => - elemsList.map({ elem: Int => - (1 + elem) * randCoefficient.toLong % MinHashLSH.prime % numEntries - }).min.toDouble + val hashValues = randCoefficients.map({ case (a: Int, b: Int) => + elemsList.map { elem: Int => + ((1 + elem) * a + b) % MinHashLSH.HASH_PRIME % numEntries + }.min.toDouble }) - // TODO: For AND-amplification, output vectors of dimension numHashFunctions + // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 hashValues.grouped(1).map(Vectors.dense).toArray } } @@ -74,7 +74,7 @@ class MinHashModel private[ml] ( @Since("2.1.0") override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = { // Since it's generated by hashing, it will be a pair of dense vectors. - // TODO: This hashDistance function is controversial. Requires more discussion. + // TODO: This hashDistance function requires more discussion in SPARK-18454 x.zip(y).map(vectorPair => vectorPair._1.toArray.zip(vectorPair._2.toArray).count(pair => pair._1 != pair._2) ).min @@ -84,7 +84,7 @@ class MinHashModel private[ml] ( override def copy(extra: ParamMap): this.type = defaultCopy(extra) @Since("2.1.0") - override def write: MLWriter = new MinHashModel.MinHashModelWriter(this) + override def write: MLWriter = new MinHashLSHModel.MinHashLSHModelWriter(this) } /** @@ -93,17 +93,17 @@ class MinHashModel private[ml] ( * LSH class for Jaccard distance. * * The input can be dense or sparse vectors, but it is more efficient if it is sparse. For example, - * `Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])` - * means there are 10 elements in the space. This set contains elem 2, elem 3 and elem 5. - * Also, any input vector must have at least 1 non-zero indices, and all non-zero values are treated - * as binary "1" values. + * `Vectors.sparse(10, Array((2, 1.0), (3, 1.0), (5, 1.0)))` + * means there are 10 elements in the space. This set contains non-zero values at indices 2, 3, and + * 5. Also, any input vector must have at least 1 non-zero index, and all non-zero values are + * treated as binary "1" values. * * References: * [[https://en.wikipedia.org/wiki/MinHash Wikipedia on MinHash]] */ @Experimental @Since("2.1.0") -class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSeed { +class MinHashLSH(override val uid: String) extends LSH[MinHashLSHModel] with HasSeed { @Since("2.1.0") override def setInputCol(value: String): this.type = super.setInputCol(value) @@ -116,7 +116,7 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSee @Since("2.1.0") def this() = { - this(Identifiable.randomUID("min hash")) + this(Identifiable.randomUID("mh-lsh")) } /** @group setParam */ @@ -124,13 +124,15 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSee def setSeed(value: Long): this.type = set(seed, value) @Since("2.1.0") - override protected[ml] def createRawLSHModel(inputDim: Int): MinHashModel = { - require(inputDim <= MinHashLSH.prime / 2, - s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.prime / 2}.") + override protected[ml] def createRawLSHModel(inputDim: Int): MinHashLSHModel = { + require(inputDim <= MinHashLSH.HASH_PRIME, + s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.HASH_PRIME}.") val rand = new Random($(seed)) - val numEntry = inputDim * 2 - val randCoofs: Array[Int] = Array.fill($(numHashTables))(1 + rand.nextInt(MinHashLSH.prime - 1)) - new MinHashModel(uid, numEntry, randCoofs) + val numEntry = inputDim + val randCoefs: Array[(Int, Int)] = Array.fill(2 * $(numHashTables)) { + (1 + rand.nextInt(MinHashLSH.HASH_PRIME - 1), rand.nextInt(MinHashLSH.HASH_PRIME - 1)) + } + new MinHashLSHModel(uid, numEntry, randCoefs) } @Since("2.1.0") @@ -146,46 +148,49 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashModel] with HasSee @Since("2.1.0") object MinHashLSH extends DefaultParamsReadable[MinHashLSH] { // A large prime smaller than sqrt(2^63 − 1) - private[ml] val prime = 2038074743 + private[ml] val HASH_PRIME = 2038074743 @Since("2.1.0") override def load(path: String): MinHashLSH = super.load(path) } @Since("2.1.0") -object MinHashModel extends MLReadable[MinHashModel] { +object MinHashLSHModel extends MLReadable[MinHashLSHModel] { @Since("2.1.0") - override def read: MLReader[MinHashModel] = new MinHashModelReader + override def read: MLReader[MinHashLSHModel] = new MinHashLSHModelReader @Since("2.1.0") - override def load(path: String): MinHashModel = super.load(path) + override def load(path: String): MinHashLSHModel = super.load(path) - private[MinHashModel] class MinHashModelWriter(instance: MinHashModel) extends MLWriter { + private[MinHashLSHModel] class MinHashLSHModelWriter(instance: MinHashLSHModel) + extends MLWriter { private case class Data(numEntries: Int, randCoefficients: Array[Int]) override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.numEntries, instance.randCoefficients) + val data = Data(instance.numEntries, instance.randCoefficients + .flatMap(tuple => Array(tuple._1, tuple._2))) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } } - private class MinHashModelReader extends MLReader[MinHashModel] { + private class MinHashLSHModelReader extends MLReader[MinHashLSHModel] { /** Checked against metadata when loading model */ - private val className = classOf[MinHashModel].getName + private val className = classOf[MinHashLSHModel].getName - override def load(path: String): MinHashModel = { + override def load(path: String): MinHashLSHModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString val data = sparkSession.read.parquet(dataPath).select("numEntries", "randCoefficients").head() val numEntries = data.getAs[Int](0) - val randCoefficients = data.getAs[Seq[Int]](1).toArray - val model = new MinHashModel(metadata.uid, numEntries, randCoefficients) + val randCoefficients = data.getAs[Seq[Int]](1).grouped(2) + .map(tuple => (tuple(0), tuple(1))).toArray + val model = new MinHashLSHModel(metadata.uid, numEntries, randCoefficients) DefaultParamsReader.getAndSetParams(model, metadata) model diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 0d7d59845bb65..06db78902efc4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -44,7 +44,7 @@ class BucketedRandomProjectionLSHSuite test("params") { ParamsSuite.checkParams(new BucketedRandomProjectionLSH) - val model = new BucketedRandomProjectionModel( + val model = new BucketedRandomProjectionLSHModel( "brp", randUnitVectors = Array(Vectors.dense(1.0, 0.0))) ParamsSuite.checkParams(model) } @@ -56,8 +56,8 @@ class BucketedRandomProjectionLSHSuite test("read/write") { def checkModelData( - model: BucketedRandomProjectionModel, - model2: BucketedRandomProjectionModel + model: BucketedRandomProjectionLSHModel, + model2: BucketedRandomProjectionLSHModel ): Unit = { model.randUnitVectors.zip(model2.randUnitVectors) .foreach(pair => assert(pair._1 === pair._2)) @@ -69,7 +69,7 @@ class BucketedRandomProjectionLSHSuite test("hashFunction") { val randUnitVectors = Array(Vectors.dense(0.0, 1.0), Vectors.dense(1.0, 0.0)) - val model = new BucketedRandomProjectionModel("brp", randUnitVectors) + val model = new BucketedRandomProjectionLSHModel("brp", randUnitVectors) model.set(model.bucketLength, 0.5) val res = model.hashFunction(Vectors.dense(1.23, 4.56)) assert(res(0).equals(Vectors.dense(9.0))) @@ -77,7 +77,7 @@ class BucketedRandomProjectionLSHSuite } test("keyDistance") { - val model = new BucketedRandomProjectionModel("brp", Array(Vectors.dense(0.0, 1.0))) + val model = new BucketedRandomProjectionLSHModel("brp", Array(Vectors.dense(0.0, 1.0))) val keyDist = model.keyDistance(Vectors.dense(1, 2), Vectors.dense(-2, -2)) assert(keyDist === 5) } @@ -161,6 +161,25 @@ class BucketedRandomProjectionLSHSuite assert(recall >= 0.7) } + test("approxNearestNeighbors for numNeighbors <= 0") { + val key = Vectors.dense(1.2, 3.4) + + val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(20) + .setInputCol("keys") + .setOutputCol("values") + .setBucketLength(1.0) + .setSeed(12345) + + val model = brp.fit(dataset) + intercept[IllegalArgumentException] { + model.approxNearestNeighbors(dataset, key, 0) + } + intercept[IllegalArgumentException] { + model.approxNearestNeighbors(dataset, key, -1) + } + } + test("approxSimilarityJoin for bucketed random projection on different dataset") { val data2 = { for (i <- 0 until 24) yield Vectors.dense(10 * sin(Pi / 12 * i), 10 * cos(Pi / 12 * i)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index 0ae761b5e24b5..ac57261fd8fbd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -39,7 +39,7 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa test("params") { ParamsSuite.checkParams(new MinHashLSH) - val model = new MinHashModel("mh", numEntries = 2, randCoefficients = Array(1)) + val model = new MinHashLSHModel("mh", numEntries = 2, randCoefficients = Array((1, 0))) ParamsSuite.checkParams(model) } @@ -49,7 +49,7 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } test("read/write") { - def checkModelData(model: MinHashModel, model2: MinHashModel): Unit = { + def checkModelData(model: MinHashLSHModel, model2: MinHashLSHModel): Unit = { assert(model.numEntries === model2.numEntries) assertResult(model.randCoefficients)(model2.randCoefficients) } @@ -59,15 +59,24 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } test("hashFunction") { - val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(0, 1, 3)) + val model = new MinHashLSHModel("mh", numEntries = 20, + randCoefficients = Array((0, 1), (1, 2), (3, 0))) val res = model.hashFunction(Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0)))) - assert(res(0).equals(Vectors.dense(0.0))) - assert(res(1).equals(Vectors.dense(3.0))) + assert(res(0).equals(Vectors.dense(1.0))) + assert(res(1).equals(Vectors.dense(5.0))) assert(res(2).equals(Vectors.dense(4.0))) } + test("hashFunction: empty vector") { + val model = new MinHashLSHModel("mh", numEntries = 20, + randCoefficients = Array((0, 1), (1, 2), (3, 0))) + intercept[IllegalArgumentException] { + model.hashFunction(Vectors.sparse(10, Seq())) + } + } + test("keyDistance") { - val model = new MinHashModel("mh", numEntries = 20, randCoefficients = Array(1)) + val model = new MinHashLSHModel("mh", numEntries = 20, randCoefficients = Array((1, 0))) val v1 = Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0))) val v2 = Vectors.sparse(10, Seq((1, 1.0), (3, 1.0), (5, 1.0), (7, 1.0), (9, 1.0))) val keyDist = model.keyDistance(v1, v2) @@ -85,6 +94,21 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(falseNegative < 0.3) } + test("MinHashLSH: test of inputDim > prime") { + val mh = new MinHashLSH() + .setInputCol("keys") + .setOutputCol("values") + .setSeed(12344) + + val data = { + for (i <- 0 to 95) yield Vectors.sparse(Int.MaxValue, (i until i + 5).map((_, 1.0))) + } + val badDataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") + intercept[IllegalArgumentException] { + mh.fit(badDataset) + } + } + test("approxNearestNeighbors for min hash") { val mh = new MinHashLSH() .setNumHashTables(20) @@ -101,6 +125,25 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa assert(recall >= 0.7) } + test("approxNearestNeighbors for numNeighbors <= 0") { + val mh = new MinHashLSH() + .setNumHashTables(20) + .setInputCol("keys") + .setOutputCol("values") + .setSeed(12345) + + val key: Vector = Vectors.sparse(100, + (0 until 100).filter(_.toString.contains("1")).map((_, 1.0))) + + val model = mh.fit(dataset) + intercept[IllegalArgumentException] { + model.approxNearestNeighbors(dataset, key, 0) + } + intercept[IllegalArgumentException] { + model.approxNearestNeighbors(dataset, key, -1) + } + } + test("approxSimilarityJoin for min hash on different dataset") { val data1 = { for (i <- 0 until 20) yield Vectors.sparse(100, (5 * i until 5 * i + 5).map((_, 1.0))) From c597f4c83519af38a9749acd71078ac20ef20c14 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 15 Nov 2016 17:14:21 -0800 Subject: [PATCH 11/20] Add unit tests to run on Jenkins. --- .../BucketedRandomProjectionLSHSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 06db78902efc4..891613212ff7a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -215,4 +215,24 @@ class BucketedRandomProjectionLSHSuite assert(precision == 1.0) assert(recall >= 0.7) } + + test("memory leak test") { + val numDim = 50 + val data = { + for (i <- 0 until numDim; j <- Seq(-2, -1, 1, 2)) + yield Vectors.sparse(numDim, Seq((i, j.toDouble))) + } + val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") + + // Project from 100 dimensional Euclidean Space to 10 dimensions + val brp = new BucketedRandomProjectionLSH() + .setNumHashTables(10) + .setInputCol("keys") + .setOutputCol("values") + .setBucketLength(2.5) + .setSeed(12345) + val model = brp.fit(df) + val joined = model.approxSimilarityJoin(df, df, Double.MaxValue, "distCol") + joined.show() + } } From d75987591c68aaae5bd007a92f3587193edd7b2a Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 15 Nov 2016 18:58:33 -0800 Subject: [PATCH 12/20] Add unit tests to run on Jenkins. --- .../spark/ml/feature/BucketedRandomProjectionLSHSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 891613212ff7a..b5da18e748cce 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -217,7 +217,7 @@ class BucketedRandomProjectionLSHSuite } test("memory leak test") { - val numDim = 50 + val numDim = 10 val data = { for (i <- 0 until numDim; j <- Seq(-2, -1, 1, 2)) yield Vectors.sparse(numDim, Seq((i, j.toDouble))) From 596eb068d552d51cb3478afd9b05a2cd02510309 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Thu, 17 Nov 2016 14:55:23 -0800 Subject: [PATCH 13/20] CR comments --- .../feature/BucketedRandomProjectionLSH.scala | 3 +-- .../org/apache/spark/ml/feature/LSH.scala | 2 ++ .../apache/spark/ml/feature/MinHashLSH.scala | 4 ++-- .../BucketedRandomProjectionLSHSuite.scala | 22 +------------------ .../spark/ml/feature/MinHashLSHSuite.scala | 3 ++- 5 files changed, 8 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index c3c71032112b8..a3407d4d82fd0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -152,8 +152,7 @@ class BucketedRandomProjectionLSH(override val uid: String) @Since("2.1.0") override protected[this] def createRawLSHModel( - inputDim: Int - ): BucketedRandomProjectionLSHModel = { + inputDim: Int): BucketedRandomProjectionLSHModel = { val rand = new Random($(seed)) val randUnitVectors: Array[Vector] = { Array.fill($(numHashTables)) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index 6b822d490eb5a..cd155b65c5038 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -153,6 +153,8 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * the [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the * transformed data when necessary. * + * NOTE: This method is experimental and will likely change behavior in the next release. + * * @param dataset the dataset to search for nearest neighbors of the key * @param key Feature vector representing the item to search for * @param numNearestNeighbors The maximum number of nearest neighbors diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 370a3a79da924..8f8653bb78f43 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -32,11 +32,11 @@ import org.apache.spark.sql.types.StructType * :: Experimental :: * * Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function is - * a perfect hash function for a specific set `S` with cardinality equal to `numEntries`: + * picked from a hash function for a specific set `S` with cardinality equal to `numEntries`: * `h_i(x) = ((x \cdot a_i + b_i) \mod prime) \mod numEntries` * * @param numEntries The number of entries of the hash functions. - * @param randCoefficients An array of random coefficients, each used by one hash function. + * @param randCoefficients Pairs of random coefficients. Each pair is used by one hash function. */ @Experimental @Since("2.1.0") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index b5da18e748cce..15af19fc7d003 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -72,6 +72,7 @@ class BucketedRandomProjectionLSHSuite val model = new BucketedRandomProjectionLSHModel("brp", randUnitVectors) model.set(model.bucketLength, 0.5) val res = model.hashFunction(Vectors.dense(1.23, 4.56)) + assert(res.length == 2) assert(res(0).equals(Vectors.dense(9.0))) assert(res(1).equals(Vectors.dense(2.0))) } @@ -165,7 +166,6 @@ class BucketedRandomProjectionLSHSuite val key = Vectors.dense(1.2, 3.4) val brp = new BucketedRandomProjectionLSH() - .setNumHashTables(20) .setInputCol("keys") .setOutputCol("values") .setBucketLength(1.0) @@ -215,24 +215,4 @@ class BucketedRandomProjectionLSHSuite assert(precision == 1.0) assert(recall >= 0.7) } - - test("memory leak test") { - val numDim = 10 - val data = { - for (i <- 0 until numDim; j <- Seq(-2, -1, 1, 2)) - yield Vectors.sparse(numDim, Seq((i, j.toDouble))) - } - val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") - - // Project from 100 dimensional Euclidean Space to 10 dimensions - val brp = new BucketedRandomProjectionLSH() - .setNumHashTables(10) - .setInputCol("keys") - .setOutputCol("values") - .setBucketLength(2.5) - .setSeed(12345) - val model = brp.fit(df) - val joined = model.approxSimilarityJoin(df, df, Double.MaxValue, "distCol") - joined.show() - } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index ac57261fd8fbd..c70956283cb70 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -62,6 +62,7 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa val model = new MinHashLSHModel("mh", numEntries = 20, randCoefficients = Array((0, 1), (1, 2), (3, 0))) val res = model.hashFunction(Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0)))) + assert(res.length == 3) assert(res(0).equals(Vectors.dense(1.0))) assert(res(1).equals(Vectors.dense(5.0))) assert(res(2).equals(Vectors.dense(4.0))) @@ -101,7 +102,7 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa .setSeed(12344) val data = { - for (i <- 0 to 95) yield Vectors.sparse(Int.MaxValue, (i until i + 5).map((_, 1.0))) + for (i <- 0 to 2) yield Vectors.sparse(Int.MaxValue, (i until i + 5).map((_, 1.0))) } val badDataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") intercept[IllegalArgumentException] { From 3d0810f25e22f6b8d64a907ade9cca14de7be763 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Thu, 17 Nov 2016 15:17:33 -0800 Subject: [PATCH 14/20] Update comments --- .../main/scala/org/apache/spark/ml/feature/MinHashLSH.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 8f8653bb78f43..8fa18ccd4cdfa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -94,8 +94,8 @@ class MinHashLSHModel private[ml]( * * The input can be dense or sparse vectors, but it is more efficient if it is sparse. For example, * `Vectors.sparse(10, Array((2, 1.0), (3, 1.0), (5, 1.0)))` - * means there are 10 elements in the space. This set contains non-zero values at indices 2, 3, and - * 5. Also, any input vector must have at least 1 non-zero index, and all non-zero values are + * means there are 10 elements in the space. This set contains elements 2, 3, and 5. Also, any + * input vector must have at least 1 non-zero index, and all non-zero values are * treated as binary "1" values. * * References: From 257ef1955696b937a0b53feb0ebde136f482dae1 Mon Sep 17 00:00:00 2001 From: Yunni Date: Thu, 17 Nov 2016 21:50:35 -0500 Subject: [PATCH 15/20] Add scaladoc for approximately min-wise independence --- .../scala/org/apache/spark/ml/feature/MinHashLSH.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 8fa18ccd4cdfa..44bbb76b1cc2f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -32,9 +32,15 @@ import org.apache.spark.sql.types.StructType * :: Experimental :: * * Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function is - * picked from a hash function for a specific set `S` with cardinality equal to `numEntries`: + * picked from a hash family for a specific set `S` with cardinality equal to `numEntries`: * `h_i(x) = ((x \cdot a_i + b_i) \mod prime) \mod numEntries` * + * This hash family is a 2-universal hash family which is approximately min-wise independent. + * + * Reference: + * [[http://people.csail.mit.edu/mip/papers/kwise-lb/kwise-lb.pdf On the k-Independence Required by + * Linear Probing and Minwise Independence]] + * * @param numEntries The number of entries of the hash functions. * @param randCoefficients Pairs of random coefficients. Each pair is used by one hash function. */ From 2c264b7660d8be68428f573be67f2720ee9a3c51 Mon Sep 17 00:00:00 2001 From: Yunni Date: Thu, 17 Nov 2016 22:06:10 -0500 Subject: [PATCH 16/20] Change documentation reference --- .../main/scala/org/apache/spark/ml/feature/MinHashLSH.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 44bbb76b1cc2f..7c76d26bfb668 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -35,11 +35,11 @@ import org.apache.spark.sql.types.StructType * picked from a hash family for a specific set `S` with cardinality equal to `numEntries`: * `h_i(x) = ((x \cdot a_i + b_i) \mod prime) \mod numEntries` * - * This hash family is a 2-universal hash family which is approximately min-wise independent. + * This hash family is approximately min-wise independent according to the reference. * * Reference: - * [[http://people.csail.mit.edu/mip/papers/kwise-lb/kwise-lb.pdf On the k-Independence Required by - * Linear Probing and Minwise Independence]] + * [[http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.121.8215&rep=rep1&type=pdf Min-wise + * independent permutations]] * * @param numEntries The number of entries of the hash functions. * @param randCoefficients Pairs of random coefficients. Each pair is used by one hash function. From 36ca278ade551dbb5d7a9acbf0d0a4bef73c4a8f Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Fri, 18 Nov 2016 17:48:10 -0800 Subject: [PATCH 17/20] Removing modulo numEntries --- .../apache/spark/ml/feature/MinHashLSH.scala | 19 +++++++------------ .../spark/ml/feature/MinHashLSHSuite.scala | 13 +++++-------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 8fa18ccd4cdfa..c008b79920627 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -35,14 +35,12 @@ import org.apache.spark.sql.types.StructType * picked from a hash function for a specific set `S` with cardinality equal to `numEntries`: * `h_i(x) = ((x \cdot a_i + b_i) \mod prime) \mod numEntries` * - * @param numEntries The number of entries of the hash functions. * @param randCoefficients Pairs of random coefficients. Each pair is used by one hash function. */ @Experimental @Since("2.1.0") class MinHashLSHModel private[ml]( override val uid: String, - private[ml] val numEntries: Int, private[ml] val randCoefficients: Array[(Int, Int)]) extends LSHModel[MinHashLSHModel] { @@ -53,7 +51,7 @@ class MinHashLSHModel private[ml]( val elemsList = elems.toSparse.indices.toList val hashValues = randCoefficients.map({ case (a: Int, b: Int) => elemsList.map { elem: Int => - ((1 + elem) * a + b) % MinHashLSH.HASH_PRIME % numEntries + ((1 + elem) * a + b) % MinHashLSH.HASH_PRIME }.min.toDouble }) // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 @@ -128,11 +126,10 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashLSHModel] with Has require(inputDim <= MinHashLSH.HASH_PRIME, s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.HASH_PRIME}.") val rand = new Random($(seed)) - val numEntry = inputDim val randCoefs: Array[(Int, Int)] = Array.fill(2 * $(numHashTables)) { (1 + rand.nextInt(MinHashLSH.HASH_PRIME - 1), rand.nextInt(MinHashLSH.HASH_PRIME - 1)) } - new MinHashLSHModel(uid, numEntry, randCoefs) + new MinHashLSHModel(uid, randCoefs) } @Since("2.1.0") @@ -166,12 +163,11 @@ object MinHashLSHModel extends MLReadable[MinHashLSHModel] { private[MinHashLSHModel] class MinHashLSHModelWriter(instance: MinHashLSHModel) extends MLWriter { - private case class Data(numEntries: Int, randCoefficients: Array[Int]) + private case class Data(randCoefficients: Array[Int]) override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.numEntries, instance.randCoefficients - .flatMap(tuple => Array(tuple._1, tuple._2))) + val data = Data(instance.randCoefficients.flatMap(tuple => Array(tuple._1, tuple._2))) val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -186,11 +182,10 @@ object MinHashLSHModel extends MLReadable[MinHashLSHModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath).select("numEntries", "randCoefficients").head() - val numEntries = data.getAs[Int](0) - val randCoefficients = data.getAs[Seq[Int]](1).grouped(2) + val data = sparkSession.read.parquet(dataPath).select("randCoefficients").head() + val randCoefficients = data.getAs[Seq[Int]](0).grouped(2) .map(tuple => (tuple(0), tuple(1))).toArray - val model = new MinHashLSHModel(metadata.uid, numEntries, randCoefficients) + val model = new MinHashLSHModel(metadata.uid, randCoefficients) DefaultParamsReader.getAndSetParams(model, metadata) model diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index c70956283cb70..6dde970b85ce5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -39,7 +39,7 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa test("params") { ParamsSuite.checkParams(new MinHashLSH) - val model = new MinHashLSHModel("mh", numEntries = 2, randCoefficients = Array((1, 0))) + val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0))) ParamsSuite.checkParams(model) } @@ -50,7 +50,6 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa test("read/write") { def checkModelData(model: MinHashLSHModel, model2: MinHashLSHModel): Unit = { - assert(model.numEntries === model2.numEntries) assertResult(model.randCoefficients)(model2.randCoefficients) } val mh = new MinHashLSH() @@ -59,25 +58,23 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } test("hashFunction") { - val model = new MinHashLSHModel("mh", numEntries = 20, - randCoefficients = Array((0, 1), (1, 2), (3, 0))) + val model = new MinHashLSHModel("mh", randCoefficients = Array((0, 1), (1, 2), (3, 0))) val res = model.hashFunction(Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0)))) assert(res.length == 3) assert(res(0).equals(Vectors.dense(1.0))) assert(res(1).equals(Vectors.dense(5.0))) - assert(res(2).equals(Vectors.dense(4.0))) + assert(res(2).equals(Vectors.dense(9.0))) } test("hashFunction: empty vector") { - val model = new MinHashLSHModel("mh", numEntries = 20, - randCoefficients = Array((0, 1), (1, 2), (3, 0))) + val model = new MinHashLSHModel("mh", randCoefficients = Array((0, 1), (1, 2), (3, 0))) intercept[IllegalArgumentException] { model.hashFunction(Vectors.sparse(10, Seq())) } } test("keyDistance") { - val model = new MinHashLSHModel("mh", numEntries = 20, randCoefficients = Array((1, 0))) + val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0))) val v1 = Vectors.sparse(10, Seq((2, 1.0), (3, 1.0), (5, 1.0), (7, 1.0))) val v2 = Vectors.sparse(10, Seq((1, 1.0), (3, 1.0), (5, 1.0), (7, 1.0), (9, 1.0))) val keyDist = model.keyDistance(v1, v2) From 939e9d5ca94607604909da0fab6cb5e06865d104 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 22 Nov 2016 10:03:17 -0800 Subject: [PATCH 18/20] Code Review Comments --- .../org/apache/spark/ml/feature/LSH.scala | 50 +++++++++---------- .../apache/spark/ml/feature/MinHashLSH.scala | 17 ++++--- .../BucketedRandomProjectionLSHSuite.scala | 3 +- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala index cd155b65c5038..309cc2ef52b04 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala @@ -50,8 +50,8 @@ private[ml] trait LSHParams extends HasInputCol with HasOutputCol { /** * Transform the Schema for LSH - * @param schema The schema of the input dataset without [[outputCol]] - * @return A derived schema with [[outputCol]] added + * @param schema The schema of the input dataset without [[outputCol]]. + * @return A derived schema with [[outputCol]] added. */ protected[this] final def validateAndTransformSchema(schema: StructType): StructType = { SchemaUtils.appendColumn(schema, $(outputCol), DataTypes.createArrayType(new VectorUDT)) @@ -73,19 +73,19 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] /** * Calculate the distance between two different keys using the distance metric corresponding - * to the hashFunction - * @param x One input vector in the metric space - * @param y One input vector in the metric space - * @return The distance between x and y + * to the hashFunction. + * @param x One input vector in the metric space. + * @param y One input vector in the metric space. + * @return The distance between x and y. */ protected[ml] def keyDistance(x: Vector, y: Vector): Double /** * Calculate the distance between two different hash Vectors. * - * @param x One of the hash vector - * @param y Another hash vector - * @return The distance between hash vectors x and y + * @param x One of the hash vector. + * @param y Another hash vector. + * @return The distance between hash vectors x and y. */ protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double @@ -153,14 +153,14 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * the [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the * transformed data when necessary. * - * NOTE: This method is experimental and will likely change behavior in the next release. + * @note This method is experimental and will likely change behavior in the next release. * - * @param dataset the dataset to search for nearest neighbors of the key - * @param key Feature vector representing the item to search for - * @param numNearestNeighbors The maximum number of nearest neighbors - * @param distCol Output column for storing the distance between each result row and the key - * @return A dataset containing at most k items closest to the key. A distCol is added to show - * the distance between each row and the key. + * @param dataset The dataset to search for nearest neighbors of the key. + * @param key Feature vector representing the item to search for. + * @param numNearestNeighbors The maximum number of nearest neighbors. + * @param distCol Output column for storing the distance between each result row and the key. + * @return A dataset containing at most k items closest to the key. A column "distCol" is added + * to show the distance between each row and the key. */ def approxNearestNeighbors( dataset: Dataset[_], @@ -187,7 +187,7 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * * @param dataset The dataset to transform and explode. * @param explodeCols The alias for the exploded columns, must be a seq of two strings. - * @return A dataset containing idCol, inputCol and explodeCols + * @return A dataset containing idCol, inputCol and explodeCols. */ private[this] def processDataset( dataset: Dataset[_], @@ -206,9 +206,9 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] /** * Recreate a column using the same column name but different attribute id. Used in approximate * similarity join. - * @param dataset The dataset where a column need to recreate - * @param colName The name of the column to recreate - * @param tmpColName A temporary column name which does not conflict with existing columns + * @param dataset The dataset where a column need to recreate. + * @param colName The name of the column to recreate. + * @param tmpColName A temporary column name which does not conflict with existing columns. * @return */ private[this] def recreateCol( @@ -227,12 +227,12 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] * [[outputCol]] exists, it will use the [[outputCol]]. This allows caching of the transformed * data when necessary. * - * @param datasetA One of the datasets to join - * @param datasetB Another dataset to join - * @param threshold The threshold for the distance of row pairs - * @param distCol Output column for storing the distance between each result row and the key + * @param datasetA One of the datasets to join. + * @param datasetB Another dataset to join. + * @param threshold The threshold for the distance of row pairs. + * @param distCol Output column for storing the distance between each result row and the key. * @return A joined dataset containing pairs of rows. The original rows are in columns - * "datasetA" and "datasetB", and a distCol is added to show the distance of each pair + * "datasetA" and "datasetB", and a distCol is added to show the distance of each pair. */ def approxSimilarityJoin( datasetA: Dataset[_], diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 123dd21c408df..35d300565a23b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -31,15 +31,16 @@ import org.apache.spark.sql.types.StructType /** * :: Experimental :: * - * Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function is - * picked from a hash family for a specific set `S` with cardinality equal to `numEntries`: - * `h_i(x) = ((x \cdot a_i + b_i) \mod prime) \mod numEntries` + * Model produced by [[MinHashLSH]], where multiple hash functions are stored. Each hash function + * is picked from the following family of hash functions, where a_i and b_i are randomly chosen + * integers less than prime: + * `h_i(x) = ((x \cdot a_i + b_i) \mod prime)` * * This hash family is approximately min-wise independent according to the reference. * * Reference: - * [[http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.121.8215&rep=rep1&type=pdf Min-wise - * independent permutations]] + * Tom Bohman, Colin Cooper, and Alan Frieze. "Min-wise independent linear permutations." + * Electronic Journal of Combinatorics 7 (2000): R26. * * @param randCoefficients Pairs of random coefficients. Each pair is used by one hash function. */ @@ -55,11 +56,11 @@ class MinHashLSHModel private[ml]( elems: Vector => { require(elems.numNonzeros > 0, "Must have at least 1 non zero entry.") val elemsList = elems.toSparse.indices.toList - val hashValues = randCoefficients.map({ case (a: Int, b: Int) => + val hashValues = randCoefficients.map { case (a, b) => elemsList.map { elem: Int => ((1 + elem) * a + b) % MinHashLSH.HASH_PRIME }.min.toDouble - }) + } // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 hashValues.grouped(1).map(Vectors.dense).toArray } @@ -132,7 +133,7 @@ class MinHashLSH(override val uid: String) extends LSH[MinHashLSHModel] with Has require(inputDim <= MinHashLSH.HASH_PRIME, s"The input vector dimension $inputDim exceeds the threshold ${MinHashLSH.HASH_PRIME}.") val rand = new Random($(seed)) - val randCoefs: Array[(Int, Int)] = Array.fill(2 * $(numHashTables)) { + val randCoefs: Array[(Int, Int)] = Array.fill($(numHashTables)) { (1 + rand.nextInt(MinHashLSH.HASH_PRIME - 1), rand.nextInt(MinHashLSH.HASH_PRIME - 1)) } new MinHashLSHModel(uid, randCoefs) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 15af19fc7d003..4a644fc58946a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -57,8 +57,7 @@ class BucketedRandomProjectionLSHSuite test("read/write") { def checkModelData( model: BucketedRandomProjectionLSHModel, - model2: BucketedRandomProjectionLSHModel - ): Unit = { + model2: BucketedRandomProjectionLSHModel): Unit = { model.randUnitVectors.zip(model2.randUnitVectors) .foreach(pair => assert(pair._1 === pair._2)) } From 8b9403d0a27928f945b6142e579a6b60f70c117f Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 22 Nov 2016 10:50:57 -0800 Subject: [PATCH 19/20] Minimize the test cases by directly using artificial models --- .../ml/feature/BucketedRandomProjectionLSHSuite.scala | 8 ++------ .../org/apache/spark/ml/feature/MinHashLSHSuite.scala | 7 +------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala index 4a644fc58946a..ab937685a555c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSHSuite.scala @@ -164,13 +164,9 @@ class BucketedRandomProjectionLSHSuite test("approxNearestNeighbors for numNeighbors <= 0") { val key = Vectors.dense(1.2, 3.4) - val brp = new BucketedRandomProjectionLSH() - .setInputCol("keys") - .setOutputCol("values") - .setBucketLength(1.0) - .setSeed(12345) + val model = new BucketedRandomProjectionLSHModel( + "brp", randUnitVectors = Array(Vectors.dense(1.0, 0.0))) - val model = brp.fit(dataset) intercept[IllegalArgumentException] { model.approxNearestNeighbors(dataset, key, 0) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala index 6dde970b85ce5..3461cdf82460f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala @@ -124,16 +124,11 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa } test("approxNearestNeighbors for numNeighbors <= 0") { - val mh = new MinHashLSH() - .setNumHashTables(20) - .setInputCol("keys") - .setOutputCol("values") - .setSeed(12345) + val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0))) val key: Vector = Vectors.sparse(100, (0 until 100).filter(_.toString.contains("1")).map((_, 1.0))) - val model = mh.fit(dataset) intercept[IllegalArgumentException] { model.approxNearestNeighbors(dataset, key, 0) } From f0ebcb736634c02c59bc50760c53dfcad21fc5d9 Mon Sep 17 00:00:00 2001 From: Yun Ni Date: Tue, 22 Nov 2016 13:35:12 -0800 Subject: [PATCH 20/20] Code review comments --- .../spark/ml/feature/BucketedRandomProjectionLSH.scala | 2 +- .../main/scala/org/apache/spark/ml/feature/MinHashLSH.scala | 2 +- .../src/test/scala/org/apache/spark/ml/feature/LSHTest.scala | 5 +++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index a3407d4d82fd0..1f2ae785c22c5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -80,7 +80,7 @@ class BucketedRandomProjectionLSHModel private[ml]( randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength)) }) // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 - hashValues.grouped(1).map(Vectors.dense).toArray + hashValues.map(Vectors.dense(_)) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index 35d300565a23b..04279eada087b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -62,7 +62,7 @@ class MinHashLSHModel private[ml]( }.min.toDouble } // TODO: Output vectors of dimension numHashFunctions in SPARK-18450 - hashValues.grouped(1).map(Vectors.dense).toArray + hashValues.map(Vectors.dense(_)) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala index 85b2c6c1cf8f3..a9b559f7ba648 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/LSHTest.scala @@ -58,9 +58,14 @@ private[ml] object LSHTest { val outputCol = model.getOutputCol val transformedData = model.transform(dataset) + // Check output column type SchemaUtils.checkColumnType( transformedData.schema, model.getOutputCol, DataTypes.createArrayType(new VectorUDT)) + // Check output column dimensions + val headHashValue = transformedData.select(outputCol).head().get(0).asInstanceOf[Seq[Vector]] + assert(headHashValue.length == model.getNumHashTables) + // Perform a cross join and label each pair of same_bucket and distance val pairs = transformedData.as("a").crossJoin(transformedData.as("b")) val distUDF = udf((x: Vector, y: Vector) => model.keyDistance(x, y), DataTypes.DoubleType)