1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .spark .mllib .regression
19+
20+ import org .apache .spark .mllib .linalg .Vector
21+ import org .apache .spark .rdd .RDD
22+
23+ sealed trait MonotonicityConstraint {
24+ def holds (current : LabeledPoint , next : LabeledPoint ): Boolean
25+ }
26+
27+ case object Isotonic extends MonotonicityConstraint {
28+ override def holds (current : LabeledPoint , next : LabeledPoint ): Boolean = {
29+ current.label <= next.label
30+ }
31+ }
32+ case object Antitonic extends MonotonicityConstraint {
33+ override def holds (current : LabeledPoint , next : LabeledPoint ): Boolean = {
34+ current.label >= next.label
35+ }
36+ }
37+
38+ /**
39+ * Regression model for Isotonic regression
40+ *
41+ * @param predictions Weights computed for every feature.
42+ */
43+ class IsotonicRegressionModel (
44+ val predictions : Seq [LabeledPoint ],
45+ val monotonicityConstraint : MonotonicityConstraint )
46+ extends RegressionModel {
47+ override def predict (testData : RDD [Vector ]): RDD [Double ] =
48+ testData.map(predict)
49+
50+ // take the highest of elements smaller than our feature or weight with lowest feature
51+ override def predict (testData : Vector ): Double =
52+ (predictions.head +: predictions.filter(y => y.features.toArray.head <= testData.toArray.head)).last.label
53+ }
54+
55+ trait IsotonicRegressionAlgorithm
56+ extends Serializable {
57+
58+ protected def createModel (weights : Seq [LabeledPoint ], monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel
59+ def run (input : RDD [LabeledPoint ], monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel
60+ def run (input : RDD [LabeledPoint ], initialWeights : Vector , monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel
61+ }
62+
63+ class PoolAdjacentViolators extends IsotonicRegressionAlgorithm {
64+
65+ override def run (input : RDD [LabeledPoint ], monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel =
66+ createModel(parallelPoolAdjacentViolators(input, monotonicityConstraint), monotonicityConstraint)
67+
68+ override def run (input : RDD [LabeledPoint ], initialWeights : Vector , monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel = ???
69+
70+ override protected def createModel (weights : Seq [LabeledPoint ], monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel =
71+ new IsotonicRegressionModel (weights, monotonicityConstraint)
72+
73+ // Performs a pool adjacent violators algorithm (PAVA)
74+ // Uses approach with single processing of data where violators in previously processed
75+ // data created by pooling are fixed immediatelly.
76+ // Uses optimization of discovering monotonicity violating sequences
77+ // Method in situ mutates input array
78+ private def poolAdjacentViolators (in : Array [LabeledPoint ], monotonicityConstraint : MonotonicityConstraint ): Array [LabeledPoint ] = {
79+
80+ // Pools sub array within given bounds assigning weighted average value to all elements
81+ def pool (in : Array [LabeledPoint ], start : Int , end : Int ): Unit = {
82+ val poolSubArray = in.slice(start, end + 1 )
83+
84+ val weightedSum = poolSubArray.map(_.label).sum
85+ val weight = poolSubArray.length
86+
87+ for (i <- start to end) {
88+ in(i) = LabeledPoint (weightedSum / weight, in(i).features)
89+ }
90+ }
91+
92+ var i = 0
93+
94+ while (i < in.length) {
95+ var j = i
96+
97+ // find monotonicity violating sequence, if any
98+ while (j < in.length - 1 && ! monotonicityConstraint.holds(in(j), in(j + 1 ))) {
99+ j = j + 1
100+ }
101+
102+ // if monotonicity was not violated, move to next data point
103+ if (i == j) {
104+ i = i + 1
105+ } else {
106+ // otherwise pool the violating sequence
107+ // and check if pooling caused monotonicity violation in previously processed points
108+ while (i >= 0 && ! monotonicityConstraint.holds(in(i), in(i + 1 ))) {
109+ pool(in, i, j)
110+ i = i - 1
111+ }
112+
113+ i = j
114+ }
115+ }
116+
117+ in
118+ }
119+
120+ private def parallelPoolAdjacentViolators (testData : RDD [LabeledPoint ], monotonicityConstraint : MonotonicityConstraint ): Seq [LabeledPoint ] = {
121+ poolAdjacentViolators(
122+ testData
123+ .sortBy(_.features.toArray.head)
124+ .mapPartitions(it => poolAdjacentViolators(it.toArray, monotonicityConstraint).toIterator)
125+ .collect(), monotonicityConstraint)
126+ }
127+ }
128+
129+ /**
130+ * Top-level methods for calling IsotonicRegression.
131+ */
132+ object IsotonicRegression {
133+
134+ /**
135+ * Train a Linear Regression model given an RDD of (label, features) pairs. We run a fixed number
136+ * of iterations of gradient descent using the specified step size. Each iteration uses
137+ * `miniBatchFraction` fraction of the data to calculate a stochastic gradient. The weights used
138+ * in gradient descent are initialized using the initial weights provided.
139+ *
140+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
141+ * matrix A as well as the corresponding right hand side label y
142+ * @param initialWeights Initial set of weights to be used. Array should be equal in size to
143+ * the number of features in the data.
144+ */
145+ def train (
146+ input : RDD [LabeledPoint ],
147+ initialWeights : Vector ,
148+ monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel = {
149+ new PoolAdjacentViolators ().run(input, initialWeights, monotonicityConstraint)
150+ }
151+
152+ /**
153+ * Train a LinearRegression model given an RDD of (label, features) pairs. We run a fixed number
154+ * of iterations of gradient descent using the specified step size. Each iteration uses
155+ * `miniBatchFraction` fraction of the data to calculate a stochastic gradient.
156+ *
157+ * @param input RDD of (label, array of features) pairs. Each pair describes a row of the data
158+ * matrix A as well as the corresponding right hand side label y
159+ */
160+ def train (
161+ input : RDD [LabeledPoint ],
162+ monotonicityConstraint : MonotonicityConstraint ): IsotonicRegressionModel = {
163+ new PoolAdjacentViolators ().run(input, monotonicityConstraint)
164+ }
165+ }
166+
167+ /* def functionalOption(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
168+ def pool2(in: Array[LabeledPoint]): Array[LabeledPoint] =
169+ in.map(p => LabeledPoint(in.map(_.label).sum / in.length, p.features))
170+
171+ def iterate(checked: Array[LabeledPoint], remaining: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint): Array[LabeledPoint] = {
172+ if(remaining.size < 2) {
173+ checked ++ remaining
174+ } else {
175+ val newRemaining = if(remaining.size == 2) Array[LabeledPoint]() else remaining.slice(2, remaining.length)
176+
177+ if(!monotonicityConstraint.holds(remaining.head, remaining.tail.head)) {
178+ iterate(checked ++ pool2(remaining.slice(0, 2)), newRemaining, monotonicityConstraint)
179+ } else {
180+ iterate(checked ++ remaining.slice(0, 2), newRemaining, monotonicityConstraint)
181+ }
182+ }
183+ }
184+
185+ iterate(Array(), in, monotonicityConstraint)
186+ }
187+
188+
189+ functionalOption(in, monotonicityConstraint)*/
190+
191+ /* def option1(in: Array[LabeledPoint], monotonicityConstraint: MonotonicityConstraint) = {
192+ def findMonotonicityViolators(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Unit = {
193+ var j = start
194+
195+ while (j >= 1 && !monotonicityConstraint.holds(in(j - 1), in(j))) {
196+ pool(in, j - 1, start + 1)
197+ j = j - 1
198+ }
199+ }
200+
201+ for (i <- 0 to in.length - 1) {
202+ findMonotonicityViolators(in, i, monotonicityConstraint)
203+ }
204+
205+ in
206+ }*/
207+
208+ /*
209+ def pool(in: Array[LabeledPoint], start: Int, end: Int): Unit = {
210+ val subArraySum = in.slice(start, end).map(_.label).sum
211+ val subArrayLength = math.abs(start - end)
212+
213+ for(i <- start to end - 1) {
214+ in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
215+ }
216+ }*/
217+
218+
219+
220+ /*
221+ OPTION 2
222+ def pool(in: Array[LabeledPoint], range: Range): Unit = {
223+ val subArray = in.slice(range.start, range.end + 1)
224+
225+ val subArraySum = subArray.map(_.label).sum
226+ val subArrayLength = subArray.length
227+
228+ for(i <- range.start to range.end) {
229+ in(i) = LabeledPoint(subArraySum / subArrayLength, in(i).features)
230+ }
231+ }
232+
233+ def poolExtendedViolators(in: Array[LabeledPoint], range: Range, monotonicityConstraint: MonotonicityConstraint): Unit = {
234+ var extendedRange = Range(range.start, range.end)
235+
236+ while (extendedRange.start >= 0 && !monotonicityConstraint.holds(in(extendedRange.start), in(extendedRange.start + 1))) {
237+ pool(in, Range(extendedRange.start, extendedRange.end))
238+ extendedRange = Range(extendedRange.start - 1, extendedRange.end)
239+ }
240+ }
241+
242+ def findViolatingSequence(in: Array[LabeledPoint], start: Int, monotonicityConstraint: MonotonicityConstraint): Option[Range] = {
243+ var j = start
244+
245+ while(j < in.length - 1 && !monotonicityConstraint.holds(in(start), in(j + 1))) {
246+ j = j + 1
247+ }
248+
249+ if(j == start) {
250+ None
251+ } else {
252+ Some(Range(start, j))
253+ }
254+ }
255+
256+ var i = 0;
257+
258+ while(i < in.length) {
259+ findViolatingSequence(in, i, monotonicityConstraint).fold[Unit]({
260+ i = i + 1
261+ })(r => {
262+ poolExtendedViolators(in, r, monotonicityConstraint)
263+ i = r.end
264+ })
265+ }
266+
267+ in
268+ */
0 commit comments