Skip to content

Commit 2f66ad1

Browse files
author
Vivek Bhaskar
authored
Provide preferred location for each bucket-id in case of partitioned sample table. (#22)
These changes are related to AQP-79. Provide preferred location for each bucket-id in case of partitioned sample table.
1 parent d80ef1b commit 2f66ad1

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,12 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
4242
prev = null
4343
}
4444
}
45+
46+
private[spark] final class PreserveLocationsRDD[U: ClassTag, T: ClassTag](
47+
prev: RDD[T],
48+
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
49+
preservesPartitioning: Boolean = false, p: (Int) => Seq[String])
50+
extends MapPartitionsRDD[U, T](prev, f, preservesPartitioning) {
51+
52+
override def getPreferredLocations(split: Partition): Seq[String] = p(split.index)
53+
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,17 @@ abstract class RDD[T: ClassTag](
821821
preservesPartitioning)
822822
}
823823

824+
def mapPartitionsWithIndexPreserveLocations[U: ClassTag](
825+
f: (Int, Iterator[T]) => Iterator[U],
826+
p: (Int) => Seq[String],
827+
preservesPartitioning: Boolean = false): RDD[U] = withScope {
828+
val cleanedF = sc.clean(f)
829+
new PreserveLocationsRDD(
830+
this,
831+
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
832+
preservesPartitioning, p)
833+
}
834+
824835
/**
825836
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
826837
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of

0 commit comments

Comments
 (0)