Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,38 @@ message SparkPlanGraphWrapper {
repeated SparkPlanGraphNodeWrapper nodes = 2;
repeated SparkPlanGraphEdge edges = 3;
}

message RDDOperationEdge {
int32 from_id = 1;
int32 to_id = 2;
}

message RDDOperationNode {
enum DeterministicLevel {
UNSPECIFIED = 0;
DETERMINATE = 1;
UNORDERED = 2;
INDETERMINATE = 3;
}
int32 id = 1;
string name = 2;
bool cached = 3;
bool barrier = 4;
string callsite = 5;
DeterministicLevel output_deterministic_level = 6;
}

message RDDOperationClusterWrapper {
string id = 1;
string name = 2;
repeated RDDOperationNode child_nodes = 3;
repeated RDDOperationClusterWrapper child_clusters = 4;
}

message RDDOperationGraphWrapper {
int64 stage_id = 1;
repeated RDDOperationEdge edges = 2;
repeated RDDOperationEdge outgoing_edges = 3;
repeated RDDOperationEdge incoming_edges = 4;
RDDOperationClusterWrapper root_cluster = 5;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer
org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf

import scala.collection.JavaConverters._

import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}

class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {

override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]

override def serialize(input: Any): Array[Byte] = {
val op = input.asInstanceOf[RDDOperationGraphWrapper]
val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
builder.setStageId(op.stageId.toLong)
op.edges.foreach { e =>
builder.addEdges(serializeRDDOperationEdge(e))
}
op.outgoingEdges.foreach { e =>
builder.addOutgoingEdges(serializeRDDOperationEdge(e))
}
op.incomingEdges.foreach { e =>
builder.addIncomingEdges(serializeRDDOperationEdge(e))
}
builder.setRootCluster(serializeRDDOperationClusterWrapper(op.rootCluster))
builder.build().toByteArray
}

def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
new RDDOperationGraphWrapper(
stageId = wrapper.getStageId.toInt,
edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that there are many redundant toSeq for Scala 2.12 in status.protobuf package, this is for Scala 2.13 compatibility due to Seq represents collection.Seq in Scala 2.12 and immutable.Seq in Scala 2.13.

This conversion will not affect Scala 2.12, but will make the performance of Scala 2.13 worse than Scala 2.12. Since these are internal definitions of Spark, I suggest explicitly defining them as scala.collection.Seq to make no performance difference between Scala 2.12 and Scala 2.13. Do you think it's ok? @gengliangwang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LuciferYang Agree, and since these are private[spark], it shouldn't be an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explicitly defining them as scala.collection.Seq to make no performance difference

@LuciferYang could you explain details?

Copy link
Contributor

@LuciferYang LuciferYang Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/spark/pull/39215/files is doing some refactor work, which does not block this one

outgoingEdges = wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
incomingEdges = wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
rootCluster = deserializeRDDOperationClusterWrapper(wrapper.getRootCluster)
)
}

private def serializeRDDOperationClusterWrapper(op: RDDOperationClusterWrapper):
StoreTypes.RDDOperationClusterWrapper = {
val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
builder.setId(op.id)
builder.setName(op.name)
op.childNodes.foreach { node =>
builder.addChildNodes(serializeRDDOperationNode(node))
}
op.childClusters.foreach { cluster =>
builder.addChildClusters(serializeRDDOperationClusterWrapper(cluster))
}
builder.build()
}

private def deserializeRDDOperationClusterWrapper(op: StoreTypes.RDDOperationClusterWrapper):
RDDOperationClusterWrapper = {
new RDDOperationClusterWrapper(
id = op.getId,
name = op.getName,
childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq,
childClusters =
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq
)
}

private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = {
val outputDeterministicLevel = StoreTypes.RDDOperationNode.DeterministicLevel
.valueOf(node.outputDeterministicLevel.toString)
val builder = StoreTypes.RDDOperationNode.newBuilder()
builder.setId(node.id)
builder.setName(node.name)
builder.setCached(node.cached)
builder.setBarrier(node.barrier)
builder.setCallsite(node.callsite)
builder.setOutputDeterministicLevel(outputDeterministicLevel)
builder.build()
}

private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): RDDOperationNode = {
RDDOperationNode(
id = node.getId,
name = node.getName,
cached = node.getCached,
barrier = node.getBarrier,
callsite = node.getCallsite,
outputDeterministicLevel =
DeterministicLevel.withName(node.getOutputDeterministicLevel.toString)
)
}

private def serializeRDDOperationEdge(edge: RDDOperationEdge): StoreTypes.RDDOperationEdge = {
val builder = StoreTypes.RDDOperationEdge.newBuilder()
builder.setFromId(edge.fromId)
builder.setToId(edge.toId)
builder.build()
}

private def deserializeRDDOperationEdge(edge: StoreTypes.RDDOperationEdge): RDDOperationEdge = {
RDDOperationEdge(
fromId = edge.getFromId,
toId = edge.getToId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import java.util.Date
import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest}
import org.apache.spark.status._
import org.apache.spark.status.api.v1._
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}

class KVStoreProtobufSerializerSuite extends SparkFunSuite {
private val serializer = new KVStoreProtobufSerializer()
Expand Down Expand Up @@ -773,4 +775,86 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
assert(result.info.processLogs(k) == input.info.processLogs(k))
}
}

test("RDD Operation Graph") {
val input = new RDDOperationGraphWrapper(
stageId = 1,
edges = Seq(
RDDOperationEdge(fromId = 2, toId = 3)
),
outgoingEdges = Seq(
RDDOperationEdge(fromId = 4, toId = 5),
RDDOperationEdge(fromId = 6, toId = 7)
),
incomingEdges = Seq(
RDDOperationEdge(fromId = 8, toId = 9),
RDDOperationEdge(fromId = 10, toId = 11),
RDDOperationEdge(fromId = 12, toId = 13)
),
rootCluster = new RDDOperationClusterWrapper(
id = "id_1",
name = "name1",
childNodes = Seq(
RDDOperationNode(
id = 14,
name = "name2",
cached = true,
barrier = false,
callsite = "callsite_1",
outputDeterministicLevel = DeterministicLevel.INDETERMINATE)),
childClusters = Seq(new RDDOperationClusterWrapper(
id = "id_1",
name = "name1",
childNodes = Seq(
RDDOperationNode(
id = 15,
name = "name3",
cached = false,
barrier = true,
callsite = "callsite_2",
outputDeterministicLevel = DeterministicLevel.UNORDERED)),
childClusters = Seq.empty
))
)
)
val bytes = serializer.serialize(input)
val result = serializer.deserialize(bytes, classOf[RDDOperationGraphWrapper])

assert(result.stageId == input.stageId)
assert(result.edges.size == input.edges.size)
result.edges.zip(input.edges).foreach { case (e1, e2) =>
assert(e1.fromId == e2.fromId)
assert(e1.toId == e2.toId)
}
assert(result.outgoingEdges.size == input.outgoingEdges.size)
result.outgoingEdges.zip(input.outgoingEdges).foreach { case (e1, e2) =>
assert(e1.fromId == e2.fromId)
assert(e1.toId == e2.toId)
}
assert(result.incomingEdges.size == input.incomingEdges.size)
result.incomingEdges.zip(input.incomingEdges).foreach { case (e1, e2) =>
assert(e1.fromId == e2.fromId)
assert(e1.toId == e2.toId)
}

def compareClusters(c1: RDDOperationClusterWrapper, c2: RDDOperationClusterWrapper): Unit = {
assert(c1.id == c2.id)
assert(c1.name == c2.name)
assert(c1.childNodes.size == c2.childNodes.size)
c1.childNodes.zip(c2.childNodes).foreach { case (n1, n2) =>
assert(n1.id == n2.id)
assert(n1.name == n2.name)
assert(n1.cached == n2.cached)
assert(n1.barrier == n2.barrier)
assert(n1.callsite == n2.callsite)
assert(n1.outputDeterministicLevel == n2.outputDeterministicLevel)
}
assert(c1.childClusters.size == c2.childClusters.size)
c1.childClusters.zip(c2.childClusters).foreach {
case (_c1, _c2) => compareClusters(_c1, _c2)
}
}

compareClusters(result.rootCluster, input.rootCluster)
}
}