Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public KVStoreSerializer() {
this.mapper = new ObjectMapper();
}

public final byte[] serialize(Object o) throws Exception {
public byte[] serialize(Object o) throws Exception {
if (o instanceof String) {
return ((String) o).getBytes(UTF_8);
} else {
Expand All @@ -62,7 +62,7 @@ public final byte[] serialize(Object o) throws Exception {
}

@SuppressWarnings("unchecked")
public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
public <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
if (klass.equals(String.class)) {
return (T) new String(data, UTF_8);
} else {
Expand Down
1 change: 0 additions & 1 deletion connector/connect/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
<url>https://spark.apache.org/</url>
<properties>
<sbt.project.name>connect</sbt.project.name>
<protobuf.version>3.21.1</protobuf.version>
<guava.version>31.0.1-jre</guava.version>
<guava.failureaccess.version>1.0.1</guava.failureaccess.version>
<io.grpc.version>1.47.0</io.grpc.version>
Expand Down
1 change: 0 additions & 1 deletion connector/protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
<artifactId>spark-protobuf_2.12</artifactId>
<properties>
<sbt.project.name>protobuf</sbt.project.name>
<protobuf.version>3.21.9</protobuf.version>
</properties>
<packaging>jar</packaging>
<name>Spark Protobuf</name>
Expand Down
51 changes: 50 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down Expand Up @@ -616,6 +621,50 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-40593 and SPARK-41215 do some work to support Spark 3.4 to be compiled on CentOS6&7. I think the core module needs similar work, but may not be in this pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WolverineJiang Are you interested in solving this problem after PR merge? Just like what you did in SPARK-41215

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WolverineJiang Are you interested in solving this problem after PR merge? Just like what you did in SPARK-41215

More than willing, I'll start in a few days.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<artifactId>maven-shade-plugin</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang It actually conflicts with parent pom.xml which causes the jetty can not be shaded into spark-core. The current master branch can not start Spark UI, see error message:

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/09 14:58:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
java.lang.NoClassDefFoundError: org/eclipse/jetty/util/thread/Scheduler
  at org.apache.spark.ui.SparkUI.<init>(SparkUI.scala:67)
  at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:223)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:484)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2739)
  at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:978)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:972)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:106)
  ... 55 elided
Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.util.thread.Scheduler
  at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
  ... 63 more
<console>:14: error: not found: value spark
       import spark.implicits._
              ^

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you Thanks for reporting it. Do you know how to fix it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should add combine.children="append" to <relocations> tag

Copy link
Member

@pan3793 pan3793 Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent a PR to fix it. #38999

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I also saw this when I build distributions. Thank you, @pan3793 !

<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
<include>com.google.protobuf:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>${spark.shade.packageName}.spark-core.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
Copy link
Contributor

@cloud-fan cloud-fan Nov 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this version also consistent in all places (Spark Connect, protobuf connector)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark Connect use protobuf-maven-plugin

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf connector uses the same plugin. I just unified the version in one variable

<!-- Generates Java classes for tests. TODO(Raghu): Generate descriptor files too. -->
<executions>
<execution>
<phase>generate-test-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
<protocVersion>${protobuf.version}</protocVersion>
<inputDirectories>
<include>src/main/protobuf</include>
</inputDirectories>
<addSources>test</addSources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";
package org.apache.spark.status.protobuf;

message JavaDate {
// The number of milliseconds since January 1, 1970, 00:00:00 GMT
optional int64 date = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use int64 directly instead of having JavaDate? milliseconds since epoch is a very common definition and not tied to java.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need utility methods for future encoding/decoding of java dates anyway. I prefer the current implementation. Looks more graceful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaDate is not really a data structure. Here it's more like a marker to indicate that the deserializer should turn it into Java Date. I think this should be the responsibility of JobDataWrapperSerializer, not the proto message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, changed to int64

}

enum JobExecutionStatus {
RUNNING = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we nee to add UNSPECIFIED to follow protobuf style guideline? cc @amaliujia

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably it's fine as these proto message are only used by Spark internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get it. There is UNKNOWN status already. There is always a value set for JobExecutionStatus.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI that is a from the official style guide that 0 is used for UNSPECIFIED and keep UNSPECIFIED: https://developers.google.com/protocol-buffers/docs/style#enums

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, please reserve 0 for unspecified to allow for evolution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated

SUCCEEDED = 1;
FAILED = 2;
UNKNOWN = 3;
}

message JobData {
int32 job_id = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discuss:

Since we are defining the schema here, do we want to make all id's (rdd, stage, job, etc) int64 instead of int32 ?
This will be a pain to change in future for protobuf, if we need to - unlike json.

(Example, see this PR).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, changed to int64

string name = 2;
optional string description = 3;
optional JavaDate submission_time = 4;
optional JavaDate completion_time = 5;
repeated int32 stage_ids = 6;
optional string job_group = 7;
JobExecutionStatus status = 8;
int32 num_tasks = 9;
int32 num_active_tasks = 10;
int32 num_completed_tasks = 11;
int32 num_skipped_tasks = 12;
int32 num_failed_tasks = 13;
int32 num_killed_tasks = 14;
int32 num_completed_indices = 15;
int32 num_active_stages = 16;
int32 num_completed_stages = 17;
int32 num_skipped_stages = 18;
int32 num_failed_stages = 19;
map<string, int32> kill_tasks_summary = 20;
}

message JobDataWrapper {
JobData info = 1;
repeated int32 skipped_stages = 2;
optional int64 sql_execution_id = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// Visible for testing.
private[history] val listing: KVStore = {
KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf)
KVUtils.createKVStore(storePath, isLive = false, conf)
}

private val diskManager = storePath.map { path =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
Expand Down Expand Up @@ -773,10 +772,7 @@ private[spark] object AppStatusStore {
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
// improvements on writes and reads.
val kvStore = KVUtils.createKVStore(storePath, HybridStoreDiskBackend.ROCKSDB, conf)
val kvStore = KVUtils.createKVStore(storePath, isLive = true, conf)
val store = new ElementTrackingStore(kvStore, conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
new AppStatusStore(store, listener = Some(listener))
Expand Down
34 changes: 27 additions & 7 deletions core/src/main/scala/org/apache/spark/status/KVUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore._

Expand Down Expand Up @@ -71,12 +72,14 @@ private[spark] object KVUtils extends Logging {
path: File,
metadata: M,
conf: SparkConf,
diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = {
diskBackend: Option[HybridStoreDiskBackend.Value] = None,
serializer: Option[KVStoreSerializer] = None): KVStore = {
require(metadata != null, "Metadata is required.")

val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
val db = diskBackend.getOrElse(backend(conf)) match {
case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
case LEVELDB => new LevelDB(path, kvSerializer)
case ROCKSDB => new RocksDB(path, kvSerializer)
}
val dbMeta = db.getMetadata(classTag[M].runtimeClass)
if (dbMeta == null) {
Expand All @@ -91,9 +94,26 @@ private[spark] object KVUtils extends Logging {

def createKVStore(
storePath: Option[File],
diskBackend: HybridStoreDiskBackend.Value,
isLive: Boolean,
conf: SparkConf): KVStore = {
storePath.map { path =>
val diskBackend = if (isLive) {
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
// improvements on writes and reads.
HybridStoreDiskBackend.ROCKSDB
} else {
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
}

val serializer = if (isLive) {
// For the disk-based KV store of live UI, let's simply use protobuf serializer only.
// The default serializer is slow since it is using JSON+GZip encoding.
Some(new KVStoreProtobufSerializer())
} else {
None
}

val dir = diskBackend match {
case LEVELDB => "listing.ldb"
case ROCKSDB => "listing.rdb"
Expand All @@ -108,20 +128,20 @@ private[spark] object KVUtils extends Logging {
conf.get(History.HISTORY_LOG_DIR))

try {
open(dbPath, metadata, conf, Some(diskBackend))
open(dbPath, metadata, conf, Some(diskBackend), serializer)
} catch {
// If there's an error, remove the listing database and any existing UI database
// from the store directory, since it's extremely likely that they'll all contain
// incompatible information.
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
open(dbPath, metadata, conf, Some(diskBackend))
open(dbPath, metadata, conf, Some(diskBackend), serializer)
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
// Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
open(dbPath, metadata, conf, Some(diskBackend))
open(dbPath, metadata, conf, Some(diskBackend), serializer)
}
}.getOrElse(new InMemoryStore())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf

import collection.JavaConverters._
import java.util.Date

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.JobDataWrapper
import org.apache.spark.status.api.v1.JobData

object JobDataWrapperSerializer {
def serialize(j: JobDataWrapper): Array[Byte] = {
val jobData = serializeJobData(j.info)
val builder = StoreTypes.JobDataWrapper.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gengliangwang

Where can we find StoreTypes object? I cannot locate it, is it auto generated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generated during compile time. It should be under ./core/target/scala-2.12/src_managed/main/org/apache/spark/status/protobuf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

builder.setInfo(jobData)
j.skippedStages.foreach(builder.addSkippedStages)
j.sqlExecutionId.foreach(builder.setSqlExecutionId)
builder.build().toByteArray
}

def deserialize(bytes: Array[Byte]): JobDataWrapper = {
val wrapper = StoreTypes.JobDataWrapper.parseFrom(bytes)
val sqlExecutionId = getOptional(wrapper.hasSqlExecutionId, wrapper.getSqlExecutionId)
new JobDataWrapper(
deserializeJobData(wrapper.getInfo),
wrapper.getSkippedStagesList.asScala.map(_.toInt).toSet,
sqlExecutionId
)
}

private def getOptional[T](condition: Boolean, result: () => T): Option[T] = if (condition) {
Some(result())
} else {
None
}

private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
val jobDataBuilder = StoreTypes.JobData.newBuilder()
jobDataBuilder.setJobId(jobData.jobId)
.setName(jobData.name)
.setStatus(serializeJobExecutionStatus(jobData.status))
.setNumTasks(jobData.numTasks)
.setNumActiveTasks(jobData.numActiveTasks)
.setNumCompletedTasks(jobData.numCompletedTasks)
.setNumSkippedTasks(jobData.numSkippedTasks)
.setNumFailedTasks(jobData.numFailedTasks)
.setNumKilledTasks(jobData.numKilledTasks)
.setNumCompletedIndices(jobData.numCompletedIndices)
.setNumActiveStages(jobData.numActiveStages)
.setNumCompletedStages(jobData.numCompletedStages)
.setNumSkippedStages(jobData.numSkippedStages)
.setNumFailedStages(jobData.numFailedStages)

jobData.description.foreach(jobDataBuilder.setDescription)
jobData.submissionTime.foreach { d =>
jobDataBuilder.setSubmissionTime(serializeDate(d))
}
jobData.completionTime.foreach { d =>
jobDataBuilder.setCompletionTime(serializeDate(d))
}
jobData.stageIds.foreach(jobDataBuilder.addStageIds)
jobData.jobGroup.foreach(jobDataBuilder.setJobGroup)
jobData.killedTasksSummary.foreach { entry =>
jobDataBuilder.putKillTasksSummary(entry._1, entry._2)
}
jobDataBuilder.build()
}

private def deserializeJobData(info: StoreTypes.JobData): JobData = {
val description = if (info.hasDescription) {
Some(info.getDescription)
} else {
None
}
val submissionTime =
getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime.getDate))
val completionTime = getOptional(info.hasCompletionTime,
() => new Date(info.getCompletionTime.getDate))
val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
val status = JobExecutionStatus.valueOf(info.getStatus.toString)

new JobData(
jobId = info.getJobId,
name = info.getName,
description = description,
submissionTime = submissionTime,
completionTime = completionTime,
stageIds = info.getStageIdsList.asScala.map(_.toInt),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need toSeq for Scala 2.13, or should we explicitly declare stageIds in JobData as scala.collection.Seq?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added toSeq

jobGroup = jobGroup,
status = status,
numTasks = info.getNumTasks,
numActiveTasks = info.getNumActiveTasks,
numCompletedTasks = info.getNumCompletedTasks,
numSkippedTasks = info.getNumSkippedTasks,
numFailedTasks = info.getNumFailedTasks,
numKilledTasks = info.getNumKilledTasks,
numCompletedIndices = info.getNumCompletedIndices,
numActiveStages = info.getNumActiveStages,
numCompletedStages = info.getNumCompletedStages,
numSkippedStages = info.getNumSkippedStages,
numFailedStages = info.getNumFailedStages,
killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
}

private def serializeDate(d: Date): StoreTypes.JavaDate = {
StoreTypes.JavaDate.newBuilder().setDate(d.getTime).build()
}

private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = {
StoreTypes.JobExecutionStatus.valueOf(j.toString)
}
}
Loading