Skip to content

Commit e0f7fb7

Browse files
jerryshaotdas
authored andcommitted
[SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug
`reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible. Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution? Author: jerryshao <[email protected]> Closes apache#4104 from jerryshao/SPARK-5315 and squashes the following commits: 5bc8987 [jerryshao] Address the comment c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible 8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error
1 parent 3c3fa63 commit e0f7fb7

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

project/MimaExcludes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ object MimaExcludes {
9090
// SPARK-5297 Java FileStream do not work with custom key/values
9191
ProblemFilters.exclude[MissingMethodProblem](
9292
"org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
93+
) ++ Seq(
94+
// SPARK-5315 Spark Streaming Java API returns Scala DStream
95+
ProblemFilters.exclude[MissingMethodProblem](
96+
"org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
9397
)
9498

9599
case v if v.startsWith("1.2") =>

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
211211
* @param slideDuration sliding interval of the window (i.e., the interval after which
212212
* the new DStream will generate RDDs); must be a multiple of this
213213
* DStream's batching interval
214+
* @deprecated As this API is not Java compatible.
214215
*/
216+
@deprecated("Use Java-compatible version of reduceByWindow", "1.3.0")
215217
def reduceByWindow(
216218
reduceFunc: (T, T) => T,
217219
windowDuration: Duration,
@@ -220,6 +222,24 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
220222
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
221223
}
222224

225+
/**
226+
* Return a new DStream in which each RDD has a single element generated by reducing all
227+
* elements in a sliding window over this DStream.
228+
* @param reduceFunc associative reduce function
229+
* @param windowDuration width of the window; must be a multiple of this DStream's
230+
* batching interval
231+
* @param slideDuration sliding interval of the window (i.e., the interval after which
232+
* the new DStream will generate RDDs); must be a multiple of this
233+
* DStream's batching interval
234+
*/
235+
def reduceByWindow(
236+
reduceFunc: JFunction2[T, T, T],
237+
windowDuration: Duration,
238+
slideDuration: Duration
239+
): JavaDStream[T] = {
240+
dstream.reduceByWindow(reduceFunc, windowDuration, slideDuration)
241+
}
242+
223243
/**
224244
* Return a new DStream in which each RDD has a single element generated by reducing all
225245
* elements in a sliding window over this DStream. However, the reduction is done incrementally

streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,17 @@ public void testReduce() {
306306

307307
@SuppressWarnings("unchecked")
308308
@Test
309-
public void testReduceByWindow() {
309+
public void testReduceByWindowWithInverse() {
310+
testReduceByWindow(true);
311+
}
312+
313+
@SuppressWarnings("unchecked")
314+
@Test
315+
public void testReduceByWindowWithoutInverse() {
316+
testReduceByWindow(false);
317+
}
318+
319+
private void testReduceByWindow(boolean withInverse) {
310320
List<List<Integer>> inputData = Arrays.asList(
311321
Arrays.asList(1,2,3),
312322
Arrays.asList(4,5,6),
@@ -319,8 +329,14 @@ public void testReduceByWindow() {
319329
Arrays.asList(24));
320330

321331
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
322-
JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
332+
JavaDStream<Integer> reducedWindowed = null;
333+
if (withInverse) {
334+
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
323335
new IntegerDifference(), new Duration(2000), new Duration(1000));
336+
} else {
337+
reducedWindowed = stream.reduceByWindow(new IntegerSum(),
338+
new Duration(2000), new Duration(1000));
339+
}
324340
JavaTestUtils.attachTestOutputStream(reducedWindowed);
325341
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
326342

0 commit comments

Comments
 (0)