@@ -39,6 +39,7 @@ import org.apache.spark.api.java.function.{Function => JFunction, Function2 => J
3939import org .apache .spark .partial .{BoundedDouble , PartialResult }
4040import org .apache .spark .rdd .{OrderedRDDFunctions , RDD }
4141import org .apache .spark .rdd .RDD .rddToPairRDDFunctions
42+ import org .apache .spark .serializer .Serializer
4243import org .apache .spark .storage .StorageLevel
4344import org .apache .spark .util .Utils
4445
@@ -227,24 +228,51 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
227228 * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
228229 * - `mergeCombiners`, to combine two C's into a single one.
229230 *
230- * In addition, users can control the partitioning of the output RDD, and whether to perform
231- * map-side aggregation (if a mapper can produce multiple items with the same key).
231+ * In addition, users can control the partitioning of the output RDD, the serializer that is use
232+ * for the shuffle, and whether to perform map-side aggregation (if a mapper can produce multiple
233+ * items with the same key).
232234 */
233235 def combineByKey [C ](createCombiner : JFunction [V , C ],
234- mergeValue : JFunction2 [C , V , C ],
235- mergeCombiners : JFunction2 [C , C , C ],
236- partitioner : Partitioner ): JavaPairRDD [K , C ] = {
237- implicit val ctag : ClassTag [C ] = fakeClassTag
236+ mergeValue : JFunction2 [C , V , C ],
237+ mergeCombiners : JFunction2 [C , C , C ],
238+ partitioner : Partitioner ,
239+ mapSideCombine : Boolean ,
240+ serializer : Serializer ): JavaPairRDD [K , C ] = {
241+ implicit val ctag : ClassTag [C ] = fakeClassTag
238242 fromRDD(rdd.combineByKey(
239243 createCombiner,
240244 mergeValue,
241245 mergeCombiners,
242- partitioner
246+ partitioner,
247+ mapSideCombine,
248+ serializer
243249 ))
244250 }
245251
246252 /**
247- * Simplified version of combineByKey that hash-partitions the output RDD.
253+ * Generic function to combine the elements for each key using a custom set of aggregation
254+ * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
255+ * "combined type" C * Note that V and C can be different -- for example, one might group an
256+ * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
257+ * functions:
258+ *
259+ * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
260+ * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
261+ * - `mergeCombiners`, to combine two C's into a single one.
262+ *
263+ * In addition, users can control the partitioning of the output RDD. This method automatically
264+ * uses map-side aggregation in shuffling the RDD.
265+ */
266+ def combineByKey [C ](createCombiner : JFunction [V , C ],
267+ mergeValue : JFunction2 [C , V , C ],
268+ mergeCombiners : JFunction2 [C , C , C ],
269+ partitioner : Partitioner ): JavaPairRDD [K , C ] = {
270+ combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, true , null )
271+ }
272+
273+ /**
274+ * Simplified version of combineByKey that hash-partitions the output RDD and uses map-side
275+ * aggregation.
248276 */
249277 def combineByKey [C ](createCombiner : JFunction [V , C ],
250278 mergeValue : JFunction2 [C , V , C ],
@@ -488,7 +516,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
488516
489517 /**
490518 * Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
491- * partitioner/parallelism level.
519+ * partitioner/parallelism level and using map-side aggregation .
492520 */
493521 def combineByKey [C ](createCombiner : JFunction [V , C ],
494522 mergeValue : JFunction2 [C , V , C ],
0 commit comments