Skip to content

Commit 726bcd5

Browse files
Vivek BhaskarSumedh Wale
authored andcommitted
[SNAPPYDATA] Provide preferred location for each bucket-id in case of partitioned sample table. (#22)
These changes are related to AQP-79. Provide preferred location for each bucket-id in case of partitioned sample table.
1 parent 0cc6dfd commit 726bcd5

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
4242
prev = null
4343
}
4444
}
45+
46+
private[spark] final class PreserveLocationsRDD[U: ClassTag, T: ClassTag](
47+
prev: RDD[T],
48+
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
49+
preservesPartitioning: Boolean = false, p: (Int) => Seq[String])
50+
extends MapPartitionsRDD[U, T](prev, f, preservesPartitioning) {
51+
52+
override def getPreferredLocations(split: Partition): Seq[String] = p(split.index)
53+
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,6 +844,17 @@ abstract class RDD[T: ClassTag](
844844
preservesPartitioning)
845845
}
846846

847+
def mapPartitionsWithIndexPreserveLocations[U: ClassTag](
848+
f: (Int, Iterator[T]) => Iterator[U],
849+
p: (Int) => Seq[String],
850+
preservesPartitioning: Boolean = false): RDD[U] = withScope {
851+
val cleanedF = sc.clean(f)
852+
new PreserveLocationsRDD(
853+
this,
854+
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
855+
preservesPartitioning, p)
856+
}
857+
847858
/**
848859
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
849860
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of

0 commit comments

Comments
 (0)