Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ case class SparkListenerJobEnd(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
case class SparkListenerEnvironmentUpdate(
environmentDetails: Map[String, collection.Seq[(String, String)]])
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[spark] class AppStatusListener(
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
val details = event.environmentDetails

val jvmInfo = Map(details("JVM Information"): _*)
val jvmInfo = details("JVM Information").toMap
val runtime = new v1.RuntimeInfo(
jvmInfo.get("Java Version").orNull,
jvmInfo.get("Java Home").orNull,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ private[spark] class AppStatusStore(
store.read(classOf[RDDOperationGraphWrapper], stageId).toRDDOperationGraph()
}

def operationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
def operationGraphForJob(jobId: Int): collection.Seq[RDDOperationGraph] = {
val job = store.read(classOf[JobDataWrapper], jobId)
val stages = job.info.stageIds.sorted

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -543,14 +543,14 @@ private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {

var value: v1.RDDPartitionInfo = null

def executors: Seq[String] = value.executors
def executors: collection.Seq[String] = value.executors

def memoryUsed: Long = value.memoryUsed

def diskUsed: Long = value.diskUsed

def update(
executors: Seq[String],
executors: collection.Seq[String],
memoryUsed: Long,
diskUsed: Long): Unit = {
val level = StorageLevel(diskUsed > 0, memoryUsed > 0, rddLevel.useOffHeap,
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case class ApplicationInfo private[spark](
maxCores: Option[Int],
coresPerExecutor: Option[Int],
memoryPerExecutorMB: Option[Int],
attempts: Seq[ApplicationAttemptInfo])
attempts: collection.Seq[ApplicationAttemptInfo])

@JsonIgnoreProperties(
value = Array("startTimeEpoch", "endTimeEpoch", "lastUpdatedEpoch"),
Expand Down Expand Up @@ -196,7 +196,7 @@ class JobData private[spark](
val description: Option[String],
val submissionTime: Option[Date],
val completionTime: Option[Date],
val stageIds: Seq[Int],
val stageIds: collection.Seq[Int],
val jobGroup: Option[String],
val status: JobExecutionStatus,
val numTasks: Int,
Expand All @@ -220,8 +220,8 @@ class RDDStorageInfo private[spark](
val storageLevel: String,
val memoryUsed: Long,
val diskUsed: Long,
val dataDistribution: Option[Seq[RDDDataDistribution]],
val partitions: Option[Seq[RDDPartitionInfo]])
val dataDistribution: Option[collection.Seq[RDDDataDistribution]],
val partitions: Option[collection.Seq[RDDPartitionInfo]])

class RDDDataDistribution private[spark](
val address: String,
Expand All @@ -242,7 +242,7 @@ class RDDPartitionInfo private[spark](
val storageLevel: String,
val memoryUsed: Long,
val diskUsed: Long,
val executors: Seq[String])
val executors: collection.Seq[String])

class StageData private[spark](
val status: StageStatus,
Expand Down Expand Up @@ -318,7 +318,7 @@ class TaskData private[spark](
val status: String,
val taskLocality: String,
val speculative: Boolean,
val accumulatorUpdates: Seq[AccumulableInfo],
val accumulatorUpdates: collection.Seq[AccumulableInfo],
val errorMessage: Option[String] = None,
val taskMetrics: Option[TaskMetrics] = None,
val executorLogs: Map[String, String],
Expand Down Expand Up @@ -456,12 +456,12 @@ class VersionInfo private[spark](
// REST call, they are not stored with it.
class ApplicationEnvironmentInfo private[spark] (
val runtime: RuntimeInfo,
val sparkProperties: Seq[(String, String)],
val hadoopProperties: Seq[(String, String)],
val systemProperties: Seq[(String, String)],
val metricsProperties: Seq[(String, String)],
val classpathEntries: Seq[(String, String)],
val resourceProfiles: Seq[ResourceProfileInfo])
val sparkProperties: collection.Seq[(String, String)],
val hadoopProperties: collection.Seq[(String, String)],
val systemProperties: collection.Seq[(String, String)],
val metricsProperties: collection.Seq[(String, String)],
val classpathEntries: collection.Seq[(String, String)],
val resourceProfiles: collection.Seq[ResourceProfileInfo])

class RuntimeInfo private[spark](
val javaVersion: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ class ApplicationEnvironmentInfoWrapperSerializer extends ProtobufSerDe {
}
new ApplicationEnvironmentInfo(
runtime = runtime,
sparkProperties = info.getSparkPropertiesList.asScala.map(pairSSToTuple).toSeq,
hadoopProperties = info.getHadoopPropertiesList.asScala.map(pairSSToTuple).toSeq,
systemProperties = info.getSystemPropertiesList.asScala.map(pairSSToTuple).toSeq,
metricsProperties = info.getMetricsPropertiesList.asScala.map(pairSSToTuple).toSeq,
classpathEntries = info.getClasspathEntriesList.asScala.map(pairSSToTuple).toSeq,
sparkProperties = info.getSparkPropertiesList.asScala.map(pairSSToTuple),
hadoopProperties = info.getHadoopPropertiesList.asScala.map(pairSSToTuple),
systemProperties = info.getSystemPropertiesList.asScala.map(pairSSToTuple),
metricsProperties = info.getMetricsPropertiesList.asScala.map(pairSSToTuple),
classpathEntries = info.getClasspathEntriesList.asScala.map(pairSSToTuple),
resourceProfiles =
info.getResourceProfilesList.asScala.map(deserializeResourceProfileInfo).toSeq
info.getResourceProfilesList.asScala.map(deserializeResourceProfileInfo)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ApplicationInfoWrapperSerializer extends ProtobufSerDe {
val maxCores = getOptional(info.hasMaxCores, info.getMaxCores)
val coresPerExecutor = getOptional(info.hasCoresPerExecutor, info.getCoresPerExecutor)
val memoryPerExecutorMB = getOptional(info.hasMemoryPerExecutorMb, info.getMemoryPerExecutorMb)
val attempts = info.getAttemptsList.asScala.map(deserializeApplicationAttemptInfo).toSeq
val attempts = info.getAttemptsList.asScala.map(deserializeApplicationAttemptInfo)
ApplicationInfo(
id = info.getId,
name = info.getName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
description = description,
submissionTime = submissionTime,
completionTime = completionTime,
stageIds = info.getStageIdsList.asScala.map(_.toInt).toSeq,
stageIds = info.getStageIdsList.asScala.map(_.toInt),
jobGroup = jobGroup,
status = status,
numTasks = info.getNumTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe {
if (info.getDataDistributionList.isEmpty) {
None
} else {
Some(info.getDataDistributionList.asScala.map(deserializeRDDDataDistribution).toSeq)
Some(info.getDataDistributionList.asScala.map(deserializeRDDDataDistribution))
},
partitions =
Some(info.getPartitionsList.asScala.map(deserializeRDDPartitionInfo).toSeq)
Some(info.getPartitionsList.asScala.map(deserializeRDDPartitionInfo))
)
}

Expand All @@ -126,7 +126,7 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe {
storageLevel = info.getStorageLevel,
memoryUsed = info.getMemoryUsed,
diskUsed = info.getDiskUsed,
executors = info.getExecutorsList.asScala.toSeq
executors = info.getExecutorsList.asScala
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class TaskDataWrapperSerializer extends ProtobufSerDe {
status = weakIntern(binary.getStatus),
taskLocality = weakIntern(binary.getTaskLocality),
speculative = binary.getSpeculative,
accumulatorUpdates = accumulatorUpdates.toSeq,
accumulatorUpdates = accumulatorUpdates,
errorMessage = getOptional(binary.hasErrorMessage, binary.getErrorMessage),
hasMetrics = binary.getHasMetrics,
executorDeserializeTime = binary.getExecutorDeserializeTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private[spark] class TaskDataWrapper(
@KVIndexParam(value = TaskIndexNames.LOCALITY, parent = TaskIndexNames.STAGE)
val taskLocality: String,
val speculative: Boolean,
val accumulatorUpdates: Seq[AccumulableInfo],
val accumulatorUpdates: collection.Seq[AccumulableInfo],
val errorMessage: Option[String],

val hasMetrics: Boolean,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/PagedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] abstract class PagedDataSource[T](val pageSize: Int) {
/**
* Slice a range of data.
*/
protected def sliceData(from: Int, to: Int): Seq[T]
protected def sliceData(from: Int, to: Int): collection.Seq[T]

/**
* Slice the data for this page
Expand Down Expand Up @@ -76,7 +76,7 @@ private[spark] abstract class PagedDataSource[T](val pageSize: Int) {
* The data returned by `PagedDataSource.pageData`, including the page number, the number of total
* pages and the data in this page.
*/
private[ui] case class PageData[T](totalPage: Int, data: Seq[T])
private[ui] case class PageData[T](totalPage: Int, data: collection.Seq[T])

/**
* A paged table that will generate a HTML table for a specified page and also the page navigation.
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ private[spark] object UIUtils extends Logging {
}

/** Return a "DAG visualization" DOM element that expands into a visualization for a job. */
def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] = {
def showDagVizForJob(jobId: Int,
graphs: collection.Seq[RDDOperationGraph]): collection.Seq[Node] = {
showDagViz(graphs, forJob = true)
}

Expand All @@ -503,7 +504,8 @@ private[spark] object UIUtils extends Logging {
* a format that is expected by spark-dag-viz.js. Any changes in the format here must be
* reflected there.
*/
private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): Seq[Node] = {
private def showDagViz(
graphs: collection.Seq[RDDOperationGraph], forJob: Boolean): collection.Seq[Node] = {
<div>
<span id={if (forJob) "job-dag-viz" else "stage-dag-viz"}
class="expand-dag-viz" onclick={s"toggleDagViz($forJob);"}>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[ui] case class BlockTableRowData(
executors: String)

private[ui] class BlockDataSource(
rddPartitions: Seq[RDDPartitionInfo],
rddPartitions: collection.Seq[RDDPartitionInfo],
pageSize: Int,
sortColumn: String,
desc: Boolean,
Expand All @@ -170,7 +170,7 @@ private[ui] class BlockDataSource(

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[BlockTableRowData] = {
override def sliceData(from: Int, to: Int): collection.Seq[BlockTableRowData] = {
data.slice(from, to)
}

Expand Down Expand Up @@ -210,7 +210,7 @@ private[ui] class BlockPagedTable(
request: HttpServletRequest,
rddTag: String,
basePath: String,
rddPartitions: Seq[RDDPartitionInfo],
rddPartitions: collection.Seq[RDDPartitionInfo],
executorSummaries: Seq[ExecutorSummary]) extends PagedTable[BlockTableRowData] {

private val (sortColumn, desc, pageSize) = getTableParameters(request, rddTag, "Block Name")
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2808,7 +2808,8 @@ private[spark] object Utils extends Logging {
* Redact the sensitive values in the given map. If a map key matches the redaction pattern then
* its value is replaced with a dummy text.
*/
def redact(conf: SparkConf, kvs: Seq[(String, String)]): Seq[(String, String)] = {
def redact(conf: SparkConf,
kvs: scala.collection.Seq[(String, String)]): scala.collection.Seq[(String, String)] = {
val redactionPattern = conf.get(SECRET_REDACTION_PATTERN)
redact(redactionPattern, kvs)
}
Expand All @@ -2817,7 +2818,8 @@ private[spark] object Utils extends Logging {
* Redact the sensitive values in the given map. If a map key matches the redaction pattern then
* its value is replaced with a dummy text.
*/
def redact[K, V](regex: Option[Regex], kvs: Seq[(K, V)]): Seq[(K, V)] = {
def redact[K, V](regex: Option[Regex],
kvs: scala.collection.Seq[(K, V)]): scala.collection.Seq[(K, V)] = {
regex match {
case None => kvs
case Some(r) => redact(r, kvs)
Expand All @@ -2839,7 +2841,8 @@ private[spark] object Utils extends Logging {
}
}

private def redact[K, V](redactionPattern: Regex, kvs: Seq[(K, V)]): Seq[(K, V)] = {
private def redact[K, V](redactionPattern: Regex,
kvs: scala.collection.Seq[(K, V)]): scala.collection.Seq[(K, V)] = {
// If the sensitive information regex matches with either the key or the value, redact the value
// While the original intent was to only redact the value if the key matched with the regex,
// we've found that especially in verbose mode, the value of the property may contain sensitive
Expand All @@ -2865,7 +2868,7 @@ private[spark] object Utils extends Logging {
.getOrElse((key, value))
case (key, value) =>
(key, value)
}.asInstanceOf[Seq[(K, V)]]
}.asInstanceOf[scala.collection.Seq[(K, V)]]
}

/**
Expand All @@ -2874,7 +2877,7 @@ private[spark] object Utils extends Logging {
* redacted. So theoretically, the property itself could be configured to redact its own value
* when printing.
*/
def redact(kvs: Map[String, String]): Seq[(String, String)] = {
def redact(kvs: Map[String, String]): scala.collection.Seq[(String, String)] = {
val redactionPattern = kvs.getOrElse(
SECRET_REDACTION_PATTERN.key,
SECRET_REDACTION_PATTERN.defaultValueString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,10 +1090,11 @@ private[spark] object JsonProtocolSuite extends Assertions {
}

private def assertEquals(
details1: Map[String, Seq[(String, String)]],
details2: Map[String, Seq[(String, String)]]): Unit = {
details1: Map[String, scala.collection.Seq[(String, String)]],
details2: Map[String, scala.collection.Seq[(String, String)]]): Unit = {
details1.zip(details2).foreach {
case ((key1, values1: Seq[(String, String)]), (key2, values2: Seq[(String, String)])) =>
case ((key1, values1: scala.collection.Seq[(String, String)]),
(key2, values2: scala.collection.Seq[(String, String)])) =>
assert(key1 === key2)
values1.zip(values2).foreach { case (v1, v2) => assert(v1 === v2) }
}
Expand Down Expand Up @@ -1130,7 +1131,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
}
}

private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit): Unit = {
private def assertSeqEquals[T](
seq1: scala.collection.Seq[T],
seq2: scala.collection.Seq[T],
assertEquals: (T, T) => Unit): Unit = {
assert(seq1.length === seq2.length)
seq1.zip(seq2).foreach { case (t1, t2) =>
assertEquals(t1, t2)
Expand Down
22 changes: 21 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,27 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.this"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.BlockManagerMessages$RegisterBlockManager$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.BlockManagerMessages#RegisterBlockManager.apply"),

// [SPARK-41709][CORE][SQL][UI] Explicitly define Seq as collection.Seq to avoid toSeq when create ui objects from protobuf objects for Scala 2.13
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.sparkProperties"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala 2.12 does not need to add these exclude filters, all changes only involve Scala 2.13.

ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.hadoopProperties"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.systemProperties"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.classpathEntries"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.resourceProfiles"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add MimaExcludes for Scala 2.13, should we close this one

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is acceptable, I will make more changes, otherwise I will close this pr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those are all internal methods right? people wouldn't call these classes in code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the mima filters to be added are all internal apis

ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.ApplicationInfo.apply"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationInfo.attempts"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.ApplicationInfo.copy"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApplicationInfo.copy$default$7"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.ApplicationInfo.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.ApplicationInfo.apply"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.JobData.stageIds"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.JobData.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.RDDPartitionInfo.executors"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.RDDPartitionInfo.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.TaskData.accumulatorUpdates"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.TaskData.this")

)

// Defulat exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5012,7 +5012,7 @@ class SQLConf extends Serializable with Logging {
/**
* Redacts the given option map according to the description of SQL_OPTIONS_REDACTION_PATTERN.
*/
def redactOptions[K, V](options: Seq[(K, V)]): Seq[(K, V)] = {
def redactOptions[K, V](options: collection.Seq[(K, V)]): collection.Seq[(K, V)] = {
val regexes = Seq(
getConf(SQL_OPTIONS_REDACTION_PATTERN),
SECRET_REDACTION_PATTERN.readFrom(reader))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var details: String = null
var physicalPlanDescription: String = null
var modifiedConfigs: Map[String, String] = _
var metrics = Seq[SQLPlanMetric]()
var metrics = collection.Seq[SQLPlanMetric]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var errorMessage: Option[String] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SQLExecutionUIData(
val details: String,
val physicalPlanDescription: String,
val modifiedConfigs: Map[String, String],
val metrics: Seq[SQLPlanMetric],
val metrics: collection.Seq[SQLPlanMetric],
val submissionTime: Long,
val completionTime: Option[Date],
val errorMessage: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
getOptional(ui.hasCompletionTime, () => new Date(ui.getCompletionTime))
val errorMessage = getOptional(ui.hasErrorMessage, () => ui.getErrorMessage)
val metrics =
ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m)).toSeq
ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no toSeq, one operation is about 10ns. After adding toSeq, the delay will increase linearly with the length of the original data:

  • 240ns when input length is 10
  • 1740ns when input length is 100
  • 16600ns when input length is 1000

val jobs = ui.getJobsMap.asScala.map {
case (jobId, status) => jobId.toInt -> JobExecutionStatus.valueOf(status.toString)
}.toMap
Expand Down