From 38cf0f3cc90d09435f12ad86021469eff32985db Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Thu, 31 Mar 2016 23:17:43 -0400 Subject: [PATCH 1/2] change reduce to treeReduce for lda --- .../org/apache/spark/mllib/clustering/LDAOptimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 7491ab0d51ca..1ffa8eb3012c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -451,10 +451,10 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } Iterator((stat, gammaPart)) } - val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _) + val statsSum: BDM[Double] = stats.map(_._1).treeReduce(_ += _) expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*) + stats.map(_._2).treeReduce(_ ++ _).map(_.toDenseMatrix): _*) val batchResult = statsSum :* expElogbeta.t // Note that this is an optimization to avoid batch.count From 42ae4692bb483f85c4fc46425ac312d2657ba451 Mon Sep 17 00:00:00 2001 From: Yuhao Yang Date: Wed, 6 Apr 2016 10:32:28 +0800 Subject: [PATCH 2/2] use treeaggregate --- .../org/apache/spark/mllib/clustering/LDAOptimizer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 1ffa8eb3012c..2b404a865118 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -451,10 +451,11 @@ final class OnlineLDAOptimizer extends LDAOptimizer { } Iterator((stat, gammaPart)) } - val statsSum: BDM[Double] = stats.map(_._1).treeReduce(_ += _) + val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( + _ += _, _ += _) expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( - stats.map(_._2).treeReduce(_ ++ _).map(_.toDenseMatrix): _*) + stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) val batchResult = statsSum :* expElogbeta.t // Note that this is an optimization to avoid batch.count