Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public KVStoreSerializer() {
this.mapper = new ObjectMapper();
}

public final byte[] serialize(Object o) throws Exception {
public byte[] serialize(Object o) throws Exception {
if (o instanceof String) {
return ((String) o).getBytes(UTF_8);
} else {
Expand All @@ -62,7 +62,7 @@ public final byte[] serialize(Object o) throws Exception {
}

@SuppressWarnings("unchecked")
public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
public <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
if (klass.equals(String.class)) {
return (T) new String(data, UTF_8);
} else {
Expand Down
2 changes: 1 addition & 1 deletion connector/protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<version>${protoc-jar-maven-plugin.version}</version>
<!-- Generates Java classes for tests. TODO(Raghu): Generate descriptor files too. -->
<executions>
<execution>
Expand Down
49 changes: 48 additions & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down Expand Up @@ -616,6 +621,48 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-40593 and SPARK-41215 do some work to support Spark 3.4 to be compiled on CentOS6&7. I think the core module needs similar work, but may not be in this pr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WolverineJiang Are you interested in solving this problem after PR merge? Just like what you did in SPARK-41215

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WolverineJiang Are you interested in solving this problem after PR merge? Just like what you did in SPARK-41215

More than willing, I'll start in a few days.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<artifactId>maven-shade-plugin</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang It actually conflicts with parent pom.xml which causes the jetty can not be shaded into spark-core. The current master branch can not start Spark UI, see error message:

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/09 14:58:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
java.lang.NoClassDefFoundError: org/eclipse/jetty/util/thread/Scheduler
  at org.apache.spark.ui.SparkUI.<init>(SparkUI.scala:67)
  at org.apache.spark.ui.SparkUI$.create(SparkUI.scala:223)
  at org.apache.spark.SparkContext.<init>(SparkContext.scala:484)
  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2739)
  at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:978)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:972)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:106)
  ... 55 elided
Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.util.thread.Scheduler
  at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
  ... 63 more
<console>:14: error: not found: value spark
       import spark.implicits._
              ^

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you Thanks for reporting it. Do you know how to fix it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe should add combine.children="append" to <relocations> tag

Copy link
Member

@pan3793 pan3793 Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent a PR to fix it. #38999

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I also saw this when I build distributions. Thank you, @pan3793 !

<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadeTestJar>true</shadeTestJar>
<artifactSet>
<includes>
<include>com.google.protobuf:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>${spark.shade.packageName}.spark-core.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</plugin>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>${protoc-jar-maven-plugin.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
<protocVersion>${protobuf.version}</protocVersion>
<inputDirectories>
<include>src/main/protobuf</include>
</inputDirectories>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";
package org.apache.spark.status.protobuf;

enum JobExecutionStatus {
UNSPECIFIED = 0;
RUNNING = 1;
SUCCEEDED = 2;
FAILED = 3;
UNKNOWN = 4;
}

message JobData {
// All IDs are int64 for extendability, even when they are currently int32 in Spark.
int64 job_id = 1;
string name = 2;
optional string description = 3;
optional int64 submission_time = 4;
optional int64 completion_time = 5;
repeated int64 stage_ids = 6;
optional string job_group = 7;
JobExecutionStatus status = 8;
int32 num_tasks = 9;
int32 num_active_tasks = 10;
int32 num_completed_tasks = 11;
int32 num_skipped_tasks = 12;
int32 num_failed_tasks = 13;
int32 num_killed_tasks = 14;
int32 num_completed_indices = 15;
int32 num_active_stages = 16;
int32 num_completed_stages = 17;
int32 num_skipped_stages = 18;
int32 num_failed_stages = 19;
map<string, int32> kill_tasks_summary = 20;
}

message JobDataWrapper {
JobData info = 1;
repeated int32 skipped_stages = 2;
optional int64 sql_execution_id = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// Visible for testing.
private[history] val listing: KVStore = {
KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf)
KVUtils.createKVStore(storePath, live = false, conf)
}

private val diskManager = storePath.map { path =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
Expand Down Expand Up @@ -773,10 +772,7 @@ private[spark] object AppStatusStore {
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
// improvements on writes and reads.
val kvStore = KVUtils.createKVStore(storePath, HybridStoreDiskBackend.ROCKSDB, conf)
val kvStore = KVUtils.createKVStore(storePath, live = true, conf)
val store = new ElementTrackingStore(kvStore, conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
new AppStatusStore(store, listener = Some(listener))
Expand Down
34 changes: 27 additions & 7 deletions core/src/main/scala/org/apache/spark/status/KVUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore._

Expand Down Expand Up @@ -71,12 +72,14 @@ private[spark] object KVUtils extends Logging {
path: File,
metadata: M,
conf: SparkConf,
diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = {
diskBackend: Option[HybridStoreDiskBackend.Value] = None,
serializer: Option[KVStoreSerializer] = None): KVStore = {
require(metadata != null, "Metadata is required.")

val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
val db = diskBackend.getOrElse(backend(conf)) match {
case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
case LEVELDB => new LevelDB(path, kvSerializer)
case ROCKSDB => new RocksDB(path, kvSerializer)
}
val dbMeta = db.getMetadata(classTag[M].runtimeClass)
if (dbMeta == null) {
Expand All @@ -91,9 +94,26 @@ private[spark] object KVUtils extends Logging {

def createKVStore(
storePath: Option[File],
diskBackend: HybridStoreDiskBackend.Value,
live: Boolean,
conf: SparkConf): KVStore = {
storePath.map { path =>
val diskBackend = if (live) {
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
// improvements on writes and reads.
HybridStoreDiskBackend.ROCKSDB
} else {
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
}

val serializer = if (live) {
// For the disk-based KV store of live UI, let's simply use protobuf serializer only.
// The default serializer is slow since it is using JSON+GZip encoding.
Some(new KVStoreProtobufSerializer())
} else {
None
}

val dir = diskBackend match {
case LEVELDB => "listing.ldb"
case ROCKSDB => "listing.rdb"
Expand All @@ -108,20 +128,20 @@ private[spark] object KVUtils extends Logging {
conf.get(History.HISTORY_LOG_DIR))

try {
open(dbPath, metadata, conf, Some(diskBackend))
open(dbPath, metadata, conf, Some(diskBackend), serializer)
} catch {
// If there's an error, remove the listing database and any existing UI database
// from the store directory, since it's extremely likely that they'll all contain
// incompatible information.
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
open(dbPath, metadata, conf, Some(diskBackend))
open(dbPath, metadata, conf, Some(diskBackend), serializer)
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
// Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
open(dbPath, metadata, conf, Some(diskBackend))
open(dbPath, metadata, conf, Some(diskBackend), serializer)
}
}.getOrElse(new InMemoryStore())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf

import collection.JavaConverters._
import java.util.Date

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.JobDataWrapper
import org.apache.spark.status.api.v1.JobData

object JobDataWrapperSerializer {
def serialize(j: JobDataWrapper): Array[Byte] = {
val jobData = serializeJobData(j.info)
val builder = StoreTypes.JobDataWrapper.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gengliangwang

Where can we find StoreTypes object? I cannot locate it, is it auto generated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is generated during compile time. It should be under ./core/target/scala-2.12/src_managed/main/org/apache/spark/status/protobuf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

builder.setInfo(jobData)
j.skippedStages.foreach(builder.addSkippedStages)
j.sqlExecutionId.foreach(builder.setSqlExecutionId)
builder.build().toByteArray
}

def deserialize(bytes: Array[Byte]): JobDataWrapper = {
val wrapper = StoreTypes.JobDataWrapper.parseFrom(bytes)
val sqlExecutionId = getOptional(wrapper.hasSqlExecutionId, wrapper.getSqlExecutionId)
new JobDataWrapper(
deserializeJobData(wrapper.getInfo),
wrapper.getSkippedStagesList.asScala.map(_.toInt).toSet,
sqlExecutionId
)
}

private def getOptional[T](condition: Boolean, result: () => T): Option[T] = if (condition) {
Some(result())
} else {
None
}

private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
val jobDataBuilder = StoreTypes.JobData.newBuilder()
jobDataBuilder.setJobId(jobData.jobId.toLong)
.setName(jobData.name)
.setStatus(serializeJobExecutionStatus(jobData.status))
.setNumTasks(jobData.numTasks)
.setNumActiveTasks(jobData.numActiveTasks)
.setNumCompletedTasks(jobData.numCompletedTasks)
.setNumSkippedTasks(jobData.numSkippedTasks)
.setNumFailedTasks(jobData.numFailedTasks)
.setNumKilledTasks(jobData.numKilledTasks)
.setNumCompletedIndices(jobData.numCompletedIndices)
.setNumActiveStages(jobData.numActiveStages)
.setNumCompletedStages(jobData.numCompletedStages)
.setNumSkippedStages(jobData.numSkippedStages)
.setNumFailedStages(jobData.numFailedStages)

jobData.description.foreach(jobDataBuilder.setDescription)
jobData.submissionTime.foreach { d =>
jobDataBuilder.setSubmissionTime(d.getTime)
}
jobData.completionTime.foreach { d =>
jobDataBuilder.setCompletionTime(d.getTime)
}
jobData.stageIds.foreach(id => jobDataBuilder.addStageIds(id.toLong))
jobData.jobGroup.foreach(jobDataBuilder.setJobGroup)
jobData.killedTasksSummary.foreach { entry =>
jobDataBuilder.putKillTasksSummary(entry._1, entry._2)
}
jobDataBuilder.build()
}

private def deserializeJobData(info: StoreTypes.JobData): JobData = {
val description = getOptional(info.hasDescription, info.getDescription)
val submissionTime =
getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime))
val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime))
val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
val status = JobExecutionStatus.valueOf(info.getStatus.toString)

new JobData(
jobId = info.getJobId.toInt,
name = info.getName,
description = description,
submissionTime = submissionTime,
completionTime = completionTime,
stageIds = info.getStageIdsList.asScala.map(_.toInt).toSeq,
jobGroup = jobGroup,
status = status,
numTasks = info.getNumTasks,
numActiveTasks = info.getNumActiveTasks,
numCompletedTasks = info.getNumCompletedTasks,
numSkippedTasks = info.getNumSkippedTasks,
numFailedTasks = info.getNumFailedTasks,
numKilledTasks = info.getNumKilledTasks,
numCompletedIndices = info.getNumCompletedIndices,
numActiveStages = info.getNumActiveStages,
numCompletedStages = info.getNumCompletedStages,
numSkippedStages = info.getNumSkippedStages,
numFailedStages = info.getNumFailedStages,
killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
}

private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = {
StoreTypes.JobExecutionStatus.valueOf(j.toString)
}
}
Loading