Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}

/** Set multiple parameters together */
def setAll(settings: Iterable[(String, String)]): SparkConf = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any caller is almost certainly passing an object that is Iterable as well as Traversable, but this maintains binary compatibility for now.

settings.foreach { case (k, v) => set(k, v) }
this
}

/**
* Set multiple parameters together
*/
@deprecated("Use setAll(Iterable) instead", "3.0.0")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, is it even worth keeping and deprecating this and removing it later? It'll be source compatible already and binary compatibility isn't guaranteed; one would have to go out of one's way to call the deprecated method at all, with a cast or with some odd custom collection

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason I put this forth is that the TraversableOnce problem is harder; its replacement IterableOnce doesn't exist in Scala 2.12. The same overload strategy doesn't work out quite right for the places it's used (generic types and function signatures make the overload ambiguous). I am considering proposing just changing the type there for 3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see setAll() is being used though. but maybe it won't work even after a rebuild (bin compat)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calls to setAll will now bind to the more specific Iterable arg version in all cases in Spark, and I'm not aware of any Scala classes that aren't Iterable to begin with. One consequence I suppose is that passing an Iterator will use the deprecated method. But it'll still be there. I don't anticipate we'll remove it until we want to support Scala 2.13 and that won't be in 3.0. For now, no method is removed so should be no problem.

def setAll(settings: Traversable[(String, String)]): SparkConf = {
settings.foreach { case (k, v) => set(k, v) }
this
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2556,7 +2556,7 @@ object SparkContext extends Logging {
private[spark] val DRIVER_IDENTIFIER = "driver"


private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Traversable[T])
private implicit def arrayToArrayWritable[T <: Writable : ClassTag](arr: Iterable[T])
: ArrayWritable = {
def anyToWritable[U <: Writable](u: U): Writable = u

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/util/Distribution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.collection.immutable.IndexedSeq
*/
private[spark] class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int) {
require(startIdx < endIdx)
def this(data: Traversable[Double]) = this(data.toArray, 0, data.size)
def this(data: Iterable[Double]) = this(data.toArray, 0, data.size)
java.util.Arrays.sort(data, startIdx, endIdx)
val length = endIdx - startIdx

Expand All @@ -42,7 +42,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
* given from 0 to 1
* @param probabilities
*/
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
def getQuantiles(probabilities: Iterable[Double] = defaultProbabilities)
: IndexedSeq[Double] = {
probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) }
}
Expand Down Expand Up @@ -75,15 +75,15 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va

private[spark] object Distribution {

def apply(data: Traversable[Double]): Option[Distribution] = {
def apply(data: Iterable[Double]): Option[Distribution] = {
if (data.size > 0) {
Some(new Distribution(data))
} else {
None
}
}

def showQuantiles(out: PrintStream = System.out, quantiles: Traversable[Double]) {
def showQuantiles(out: PrintStream = System.out, quantiles: Iterable[Double]) {
// scalastyle:off println
out.println("min\t25%\t50%\t75%\tmax")
quantiles.foreach{q => out.print(q + "\t")}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private[spark] object JsonProtocol {

private lazy val accumulableBlacklist = Set("internal.metrics.updatedBlockStatuses")

def accumulablesToJson(accumulables: Traversable[AccumulableInfo]): JArray = {
def accumulablesToJson(accumulables: Iterable[AccumulableInfo]): JArray = {
JArray(accumulables
.filterNot(_.name.exists(accumulableBlacklist.contains))
.toList.map(accumulableInfoToJson))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
private def checkNonZeroAvg(m: Traversable[Long], msg: String) {
private def checkNonZeroAvg(m: Iterable[Long], msg: String) {
assert(m.sum / m.size.toDouble > 0.0, msg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

protected def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
protected def setSparkEnv(settings: Iterable[(String, String)]): Unit = {
val conf = new SparkConf().setAll(settings)
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object LabelPropagation {
val count1Val = count1.getOrElse(i, 0L)
val count2Val = count2.getOrElse(i, 0L)
i -> (count1Val + count2Val)
}(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
}(collection.breakOut)
}
def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
if (message.isEmpty) attr else message.maxBy(_._2)._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object ShortestPaths extends Serializable {
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}(collection.breakOut) // more efficient alternative to [[collection.Traversable.toMap]]
}(collection.breakOut)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object AttributeSet {
* when the transformation was a no-op).
*/
class AttributeSet private (val baseSet: Set[AttributeEquals])
extends Traversable[Attribute] with Serializable {
extends Iterable[Attribute] with Serializable {

override def hashCode: Int = baseSet.hashCode()

Expand Down Expand Up @@ -99,7 +99,7 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
* Returns a new [[AttributeSet]] that does not contain any of the [[Attribute Attributes]] found
* in `other`.
*/
def --(other: Traversable[NamedExpression]): AttributeSet = {
def --(other: Iterable[NamedExpression]): AttributeSet = {
other match {
case otherSet: AttributeSet =>
new AttributeSet(baseSet -- otherSet.baseSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ abstract class Expression extends TreeNode[Expression] {
def prettyName: String = nodeName.toLowerCase(Locale.ROOT)

protected def flatArguments: Iterator[Any] = productIterator.flatMap {
case t: Traversable[_] => t
case t: Iterable[_] => t
case single => single :: Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ trait Block extends TreeNode[Block] with JavaCode {
def doTransform(arg: Any): AnyRef = arg match {
case e: ExprValue => transform(e)
case Some(value) => Some(doTransform(value))
case seq: Traversable[_] => seq.map(doTransform)
case seq: Iterable[_] => seq.map(doTransform)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only part of the change that is of concern. It's internal, but this now only matches Iterable and not Traversable, which are not the same in Scala 2.12. However, AFAICT, no standard class we'd expect here doesn't extend Iterable. See https://docs.scala-lang.org/overviews/collections/overview.html ; Seq, Set and Map are also Iterable already. This shouldn't change behavior, but tests will help prove that.

case other: AnyRef => other
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
case m: Map[_, _] => m
case d: DataType => d // Avoid unpacking Structs
case stream: Stream[_] => stream.map(recursiveTransform).force
case seq: Traversable[_] => seq.map(recursiveTransform)
case seq: Iterable[_] => seq.map(recursiveTransform)
case other: AnyRef => other
case null => null
}
Expand All @@ -142,16 +142,16 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
/** Returns all of the expressions present in this query plan operator. */
final def expressions: Seq[Expression] = {
// Recursively find all expressions from a traversable.
def seqToExpressions(seq: Traversable[Any]): Traversable[Expression] = seq.flatMap {
def seqToExpressions(seq: Iterable[Any]): Iterable[Expression] = seq.flatMap {
case e: Expression => e :: Nil
case s: Traversable[_] => seqToExpressions(s)
case s: Iterable[_] => seqToExpressions(s)
case other => Nil
}

productIterator.flatMap {
case e: Expression => e :: Nil
case s: Some[_] => seqToExpressions(s.toSeq)
case seq: Traversable[_] => seqToExpressions(seq)
case seq: Iterable[_] => seqToExpressions(seq)
case other => Nil
}.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}.view.force // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Traversable[_] => args.map(mapChild)
case args: Iterable[_] => args.map(mapChild)
case nonChild: AnyRef => nonChild
case null => null
}
Expand Down