Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,24 @@ message ExecutorSummary {
message ExecutorSummaryWrapper {
ExecutorSummary info = 1;
}

message SQLPlanMetric {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define it in the core module? Maybe it's better to put it in the sql module?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ok to define it in core module. There is "SQL" concept in core module anyway.
Can we skip shading protobuf in SQL module in this way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we define all proto messages in the core module, I think we can skip shading protobuf in SQL module, the generated message code can find the corresponding shaded+relocated dependencies in the core module

string name = 1;
int64 accumulator_id = 2;
string metric_type = 3;
}

message SQLExecutionUIData {
int64 execution_id = 1;
string description = 2;
string details = 3;
string physical_plan_description = 4;
map<string, string> modified_configs = 5;
repeated SQLPlanMetric metrics = 6;
int64 submission_time = 7;
optional int64 completion_time = 8;
optional string error_message = 9;
map<int64, JobExecutionStatus> jobs = 10;
repeated int64 stages = 11;
map<int64, string> metric_values = 12;
}
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm9-shaded</artifactId>
</dependency>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not required, right?

Copy link
Contributor Author

@LuciferYang LuciferYang Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sbt, it is not required:

[info] KVStoreProtobufSerializerSuite:
[info] - SQLExecutionUIData (121 milliseconds)
[info] Run completed in 1 second, 648 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 293 s (04:53), completed 2022-12-23 9:41:14

But for maven, it is required now:

mvn clean test -pl sql/core -am -Dtest=none -DwildcardSuites=org.apache.spark.status.protobuf.sql.KVStoreProtobufSerializerSuite
[INFO] --- scala-maven-plugin:4.8.0:compile (scala-compile-first) @ spark-sql_2.12 ---
[INFO] Compiler bridge file: /Users/yangjie01/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.8.0-bin_2.12.17__52.0-1.8.0_20221110T195421.jar
[INFO] compiler plugin: BasicArtifact(com.github.ghik,silencer-plugin_2.12.17,1.7.10,null)
[INFO] compiling 576 Scala sources and 75 Java sources to /spark-source/sql/core/target/scala-2.12/classes ...
[ERROR] [Error] /spark-source/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala:35: Class com.google.protobuf.GeneratedMessageV3 not found - continuing with a stub.
[ERROR] [Error] : Unable to locate class corresponding to inner class entry for Builder in owner com.google.protobuf.GeneratedMessageV3
[ERROR] [Error] /spark-source/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala:43: Class com.google.protobuf.GeneratedMessageV3 not found - continuing with a stub.
[ERROR] [Error] /spark-source/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala:55: value toByteArray is not a member of org.apache.spark.status.protobuf.StoreTypes.SQLExecutionUIData
[ERROR] [Error] : Unable to locate class corresponding to inner class entry for Builder in owner com.google.protobuf.GeneratedMessageV3
[ERROR] 5 errors found

during to sql module inherits protobuf-java 2.5.0 with compile scope from parent pom.xml without this change. We can verify this from the sql-module-dependency-tree.txt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#39164 (comment)

If the place we mentioned yesterday can be changed to 3.21.11, then we can remove this declaration

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, do we need to shade it?

Copy link
Contributor Author

@LuciferYang LuciferYang Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need shaded, I make a spark client with this pr for testing, and it can be parsed normally

EDIT: run TPCDSQueryBenchmark with spark-client and check ui

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmm, did you test with both Maven and SBT? Then why core module requires shading..

Copy link
Contributor Author

@LuciferYang LuciferYang Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, both MAVEN and SBT tested.

Personally, I think the core module needs shaded+relocation because we don't want protobuf version which Spark depends on to affect other third-party projects, just like connect&protobuf module. And the SQL module only uses the protobuf class generated in the core module, so it does not need to shaded + relocate again.

In fact, because Spark already uses the unified protobuf version, the connect and protobuf module only need relocation protobuf package to keep consistent with the relocation rules of the core module , rather than shaded protobuf-java again.

<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import java.util.Date

import collection.JavaConverters._

import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
import org.apache.spark.status.protobuf.Utils.getOptional

class SQLExecutionUIDataSerializer extends ProtobufSerDe {

override val supportClass: Class[_] = classOf[SQLExecutionUIData]

override def serialize(input: Any): Array[Byte] = {
val ui = input.asInstanceOf[SQLExecutionUIData]
val builder = StoreTypes.SQLExecutionUIData.newBuilder()
builder.setExecutionId(ui.executionId)
builder.setDescription(ui.description)
builder.setDetails(ui.details)
builder.setPhysicalPlanDescription(ui.physicalPlanDescription)
ui.modifiedConfigs.foreach {
case (k, v) => builder.putModifiedConfigs(k, v)
}
ui.metrics.foreach(m => builder.addMetrics(SQLPlanMetricSerializer.serialize(m)))
builder.setSubmissionTime(ui.submissionTime)
ui.completionTime.foreach(ct => builder.setCompletionTime(ct.getTime))
ui.errorMessage.foreach(builder.setErrorMessage)
ui.jobs.foreach {
case (id, status) =>
builder.putJobs(id.toLong, StoreTypes.JobExecutionStatus.valueOf(status.toString))
}
ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
val metricValues = ui.metricValues
if (metricValues != null) {
Copy link
Contributor Author

@LuciferYang LuciferYang Dec 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found a corner case. metricValues may be null, there will be an NPE if no null check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@volatile var metricsValues: Map[Long, String] = null
// Just in case job end and execution end arrive out of order, keep track of how many
// end events arrived so that the listener can stop tracking the execution.
val endEvents = new AtomicInteger()
override protected def doUpdate(): Any = {
new SQLExecutionUIData(
executionId,
description,
details,
physicalPlanDescription,
modifiedConfigs,
metrics,
submissionTime,
completionTime,
errorMessage,
jobs,
stages,
metricsValues)

Or we can protect metricsValues is not null in line498 or line 517, but because I am not sure whether it will affect the existing scenario(like json), I think we can try in another pr

metricValues.foreach {
case (k, v) => builder.putMetricValues(k, v)
}
}
builder.build().toByteArray
}

override def deserialize(bytes: Array[Byte]): SQLExecutionUIData = {
val ui = StoreTypes.SQLExecutionUIData.parseFrom(bytes)
val completionTime =
getOptional(ui.hasCompletionTime, () => new Date(ui.getCompletionTime))
val errorMessage = getOptional(ui.hasErrorMessage, () => ui.getErrorMessage)
val metrics =
ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m)).toSeq
val jobs = ui.getJobsMap.asScala.map {
case (jobId, status) => jobId.toInt -> JobExecutionStatus.valueOf(status.toString)
}.toMap
val metricValues = ui.getMetricValuesMap.asScala.map {
case (k, v) => k.toLong -> v
}.toMap

new SQLExecutionUIData(
executionId = ui.getExecutionId,
description = ui.getDescription,
details = ui.getDetails,
physicalPlanDescription = ui.getPhysicalPlanDescription,
modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap,
metrics = metrics,
submissionTime = ui.getSubmissionTime,
completionTime = completionTime,
errorMessage = errorMessage,
jobs = jobs,
stages = ui.getStagesList.asScala.map(_.toInt).toSet,
metricValues = metricValues
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.status.protobuf.StoreTypes

object SQLPlanMetricSerializer {

def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = {
StoreTypes.SQLPlanMetric.newBuilder()
.setName(metric.name)
.setAccumulatorId(metric.accumulatorId)
.setMetricType(metric.metricType)
.build()
}

def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = {
SQLPlanMetric(metrics.getName, metrics.getAccumulatorId, metrics.getMetricType)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf.sql

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
import org.apache.spark.status.api.v1.sql.SqlResourceSuite
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer

class KVStoreProtobufSerializerSuite extends SparkFunSuite {

private val serializer = new KVStoreProtobufSerializer()

test("SQLExecutionUIData") {
val input = SqlResourceSuite.sqlExecutionUIData
val bytes = serializer.serialize(input)
val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
assert(result.executionId == input.executionId)
assert(result.description == input.description)
assert(result.details == input.details)
assert(result.physicalPlanDescription == input.physicalPlanDescription)
assert(result.modifiedConfigs == input.modifiedConfigs)
assert(result.metrics == input.metrics)
assert(result.submissionTime == input.submissionTime)
assert(result.completionTime == input.completionTime)
assert(result.errorMessage == input.errorMessage)
assert(result.jobs == input.jobs)
assert(result.stages == input.stages)
assert(result.metricValues == input.metricValues)
}

test("SQLExecutionUIData with metricValues is empty map and null") {
val templateData = SqlResourceSuite.sqlExecutionUIData

val input1 = new SQLExecutionUIData(
executionId = templateData.executionId,
description = templateData.description,
details = templateData.details,
physicalPlanDescription = templateData.physicalPlanDescription,
modifiedConfigs = templateData.modifiedConfigs,
metrics = templateData.metrics,
submissionTime = templateData.submissionTime,
completionTime = templateData.completionTime,
errorMessage = templateData.errorMessage,
jobs = templateData.jobs,
stages = templateData.stages,
metricValues = Map.empty
)
val bytes1 = serializer.serialize(input1)
val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData])
// input.metricValues is empty map, result.metricValues is empty map.
assert(result1.metricValues.isEmpty)

val input2 = new SQLExecutionUIData(
executionId = templateData.executionId,
description = templateData.description,
details = templateData.details,
physicalPlanDescription = templateData.physicalPlanDescription,
modifiedConfigs = templateData.modifiedConfigs,
metrics = templateData.metrics,
submissionTime = templateData.submissionTime,
completionTime = templateData.completionTime,
errorMessage = templateData.errorMessage,
jobs = templateData.jobs,
stages = templateData.stages,
metricValues = null
)
val bytes2 = serializer.serialize(input2)
val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])
// input.metricValues is null, result.metricValues is also empty map.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang For the collection type, the input is null, and after deserialization is empty collection, which may be different from json

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an empty collection seems safer. It won't affect the UI page anyway. Thanks for noticing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

assert(result2.metricValues.isEmpty)
}
}