Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
}

def getShards(): Seq[Shard] = {
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
}

def splitShard(shardId: String): Unit = {
Expand Down Expand Up @@ -137,7 +137,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
* Expose a Python friendly API.
*/
def pushData(testData: java.util.List[Int]): Unit = {
pushData(testData.asScala, aggregate = false)
pushData(testData.asScala.toSeq, aggregate = false)
}

def deleteStream(): Unit = {
Expand Down Expand Up @@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator(
sentSeqNumbers += ((num, seqNumber))
}

shardIdToSeqNumbers.toMap
shardIdToSeqNumbers.mapValues(_.toSeq).toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService)
}
producer.flushSync()
shardIdToSeqNumbers.toMap
shardIdToSeqNumbers.mapValues(_.toSeq).toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")

shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }}
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }}
shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
val seqNumRange = SequenceNumberRange(
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.list()
.getItems
.asScala)
.asScala.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
try {
val snapshots = new ArrayList[ExecutorPodsSnapshot]()
snapshotsBuffer.drainTo(snapshots)
onNewSnapshots(snapshots.asScala)
onNewSnapshots(snapshots.asScala.toSeq)
} catch {
case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
override def stop(): Unit = {}

override def notifySubscribers(): Unit = {
subscribers.foreach(_(snapshotsBuffer))
subscribers.foreach(_(snapshotsBuffer.toSeq))
snapshotsBuffer.clear()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ object ProcessUtils extends Logging {
assert(proc.exitValue == 0,
s"Failed to execute ${fullCommand.mkString(" ")}" +
s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}")
outputLines
outputLines.toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,13 @@ private[spark] class MesosClusterScheduler(
taskId.split(s"${RETRY_SEP}").head
}

private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
private def adjust[A, B](m: Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
}

private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
// TODO(mgummelt): Don't do this here. This should be passed as a --conf
val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
val commandEnv = adjust(desc.command.environment.toMap, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}"
)

Expand Down Expand Up @@ -686,14 +686,14 @@ private[spark] class MesosClusterScheduler(
}

scheduleTasks(
copyBuffer(driversToRetry),
copyBuffer(driversToRetry).toSeq,
removeFromPendingRetryDrivers,
currentOffers,
tasks)

// Then we walk through the queued drivers and try to schedule them.
scheduleTasks(
copyBuffer(queuedDrivers),
copyBuffer(queuedDrivers).toSeq,
removeFromQueuedDrivers,
currentOffers,
tasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)

// offerID -> resources
val remainingResources = mutable.Map(offers.map(offer =>
(offer.getId.getValue, offer.getResourcesList)): _*)
val remainingResources = mutable.Map[String, JList[Resource]]()
remainingResources ++= offers.map(offer => (offer.getId.getValue, offer.getResourcesList))

var launchTasks = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ trait MesosSchedulerUtils extends Logging {
} else {
v.split(',').toSet
}
)
).toMap
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val resources = taskInfo.getResourcesList
assert(scheduler.getResource(resources, "cpus") == 1.5)
assert(scheduler.getResource(resources, "mem") == 1200)
val resourcesSeq: Seq[Resource] = resources.asScala
val resourcesSeq: Seq[Resource] = resources.asScala.toSeq
val cpus = resourcesSeq.filter(_.getName == "cpus").toList
assert(cpus.size == 2)
assert(cpus.exists(_.getRole() == "role2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ class MesosFineGrainedSchedulerBackendSuite
properties = new Properties(),
resources = immutable.Map.empty[String, ResourceInformation],
ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.resourceOffers(
expectedWorkerOffers.toIndexedSeq)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)

val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
Expand Down Expand Up @@ -379,7 +380,8 @@ class MesosFineGrainedSchedulerBackendSuite
properties = new Properties(),
resources = immutable.Map.empty[String, ResourceInformation],
ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.resourceOffers(
expectedWorkerOffers.toIndexedSeq)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)

val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ private[spark] class Client(
}

// Propagate the local URIs to the containers using the configuration.
sparkConf.set(SPARK_JARS, localJars)
sparkConf.set(SPARK_JARS, localJars.toSeq)

case None =>
// No configuration, so fall back to uploading local jar files.
Expand Down Expand Up @@ -628,7 +628,7 @@ private[spark] class Client(
}
}
if (cachedSecondaryJarLinks.nonEmpty) {
sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks)
sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq)
}

if (isClusterMode && args.primaryPyFile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Writes down information about cached files needed in executors to the given configuration.
*/
def updateConfiguration(conf: SparkConf): Unit = {
conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString))
conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size))
conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime))
conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()))
conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()))
conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString).toSeq)
conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size).toSeq)
conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime).toSeq)
conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()).toSeq)
conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()).toSeq)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private[yarn] class YarnAllocator(
val profResource = rpIdToYarnResource.get(id)
val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
.asScala.flatMap(_.asScala)
allContainerRequests(id) = result
allContainerRequests(id) = result.toSeq
}
allContainerRequests.toMap
}
Expand Down Expand Up @@ -426,13 +426,13 @@ private[yarn] class YarnAllocator(
getNumExecutorsStarting,
allocateResponse.getAvailableResources))

handleAllocatedContainers(allocatedContainers.asScala)
handleAllocatedContainers(allocatedContainers.asScala.toSeq)
}

val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
processCompletedContainers(completedContainers.asScala.toSeq)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, getNumExecutorsRunning))
}
Expand Down Expand Up @@ -960,7 +960,7 @@ private[yarn] class YarnAllocator(
}
}

(localityMatched, localityUnMatched, localityFree)
(localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
env, arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[hive] trait SparkOperation extends Operation with Logging {

protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString()

protected def cleanup(): Unit = Unit // noop by default
protected def cleanup(): Unit = () // noop by default

abstract override def run(): Unit = {
withLocalProperties {
Expand Down