Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}

import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{FlatMapFunction, Function => JFunction,
Expand Down Expand Up @@ -937,6 +938,34 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions))
}

/**
* Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
* If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
* performed efficiently by only scanning the partitions that might containt matching elements.
* Otherwise, a standard `filter` is applied to all partitions.
*
* @since 3.1.0
*/
@Since("3.1.0")
def filterByRange(lower: K, upper: K): JavaPairRDD[K, V] = {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll want the @Since("3.1.0") annotations on these methods

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I am also wondering whether it makes sense to backport this in 2.4 by the way?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not a bug fix per se. I wouldn't put it in 3.0 even necessarily.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your first contribution, @wetneb ! +1 for @srowen 's answers.

@yaooqinn yaooqinn Apr 22, 2020

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for curiosity, of the two since marks, Spark's scala @Since annotation and java @since tag, how to choose them? IMHO, @since tag seems better here to let the version show up in the generated Java API documentation. @dongjoon-hyun @srowen thanks.

@dongjoon-hyun dongjoon-hyun Apr 22, 2020

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the generated Java API doc, you should put @since into the comment.

  /**
   * Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
   * If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
   * performed efficiently by only scanning the partitions that might containt matching elements.
   * Otherwise, a standard `filter` is applied to all partitions.
   *
   * @since 3.1.0
   */

For example,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have added that too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's right, my mistake, I was thinking of Scala

val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]]
filterByRange(comp, lower, upper)
}

/**
* Return a RDD containing only the elements in the inclusive range `lower` to `upper`.
* If the RDD has been partitioned using a `RangePartitioner`, then this operation can be
* performed efficiently by only scanning the partitions that might containt matching elements.
* Otherwise, a standard `filter` is applied to all partitions.
*
* @since 3.1.0
*/
@Since("3.1.0")
def filterByRange(comp: Comparator[K], lower: K, upper: K): JavaPairRDD[K, V] = {
implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering.
fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).filterByRange(lower, upper))
}

/**
* Return an RDD with the keys of each tuple.
*/
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/java/test/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,34 @@ public int getPartition(Object key) {
Arrays.asList(new Tuple2<>(1, 3), new Tuple2<>(3, 8), new Tuple2<>(3, 8)));
}

@Test
public void filterByRange() {
List<Tuple2<Integer, Integer>> pairs = new ArrayList<>();
pairs.add(new Tuple2<>(0, 5));
pairs.add(new Tuple2<>(1, 8));
pairs.add(new Tuple2<>(2, 6));
pairs.add(new Tuple2<>(3, 8));
pairs.add(new Tuple2<>(4, 8));

JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs).sortByKey();

// Default comparator
JavaPairRDD<Integer, Integer> filteredRDD = rdd.filterByRange(3, 11);
List<Tuple2<Integer, Integer>> filteredPairs = filteredRDD.collect();
assertEquals(filteredPairs.size(), 2);
assertEquals(filteredPairs.get(0), new Tuple2<>(3, 8));
assertEquals(filteredPairs.get(1), new Tuple2<>(4, 8));

// Custom comparator
filteredRDD = rdd.filterByRange(Collections.reverseOrder(), 3, -2);
filteredPairs = filteredRDD.collect();
assertEquals(filteredPairs.size(), 4);
assertEquals(filteredPairs.get(0), new Tuple2<>(0, 5));
assertEquals(filteredPairs.get(1), new Tuple2<>(1, 8));
assertEquals(filteredPairs.get(2), new Tuple2<>(2, 6));
assertEquals(filteredPairs.get(3), new Tuple2<>(3, 8));
}

@Test
public void emptyRDD() {
JavaRDD<String> rdd = sc.emptyRDD();
Expand Down