diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java index ff99d052cf7a..02dd73e1a2f2 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java @@ -49,7 +49,7 @@ public KVStoreSerializer() { this.mapper = new ObjectMapper(); } - public final byte[] serialize(Object o) throws Exception { + public byte[] serialize(Object o) throws Exception { if (o instanceof String) { return ((String) o).getBytes(UTF_8); } else { @@ -62,7 +62,7 @@ public final byte[] serialize(Object o) throws Exception { } @SuppressWarnings("unchecked") - public final T deserialize(byte[] data, Class klass) throws Exception { + public T deserialize(byte[] data, Class klass) throws Exception { if (klass.equals(String.class)) { return (T) new String(data, UTF_8); } else { diff --git a/connector/protobuf/pom.xml b/connector/protobuf/pom.xml index 7057e6148d43..3036fcbf2562 100644 --- a/connector/protobuf/pom.xml +++ b/connector/protobuf/pom.xml @@ -122,7 +122,7 @@ com.github.os72 protoc-jar-maven-plugin - 3.11.4 + ${protoc-jar-maven-plugin.version} diff --git a/core/pom.xml b/core/pom.xml index 97cf2ec9d24a..68a2f0854c1f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -532,7 +532,12 @@ org.apache.commons commons-crypto - + + com.google.protobuf + protobuf-java + ${protobuf.version} + compile + target/scala-${scala.binary.version}/classes @@ -616,6 +621,48 @@ + + org.apache.maven.plugins + maven-shade-plugin + + false + true + + + com.google.protobuf:* + + + + + com.google.protobuf + ${spark.shade.packageName}.spark-core.protobuf + + com.google.protobuf.** + + + + + + + com.github.os72 + protoc-jar-maven-plugin + ${protoc-jar-maven-plugin.version} + + + generate-sources + + run + + + com.google.protobuf:protoc:${protobuf.version} + ${protobuf.version} + + src/main/protobuf + + + + + diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto new file mode 100644 index 000000000000..4ad5d1e75275 --- /dev/null +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; +package org.apache.spark.status.protobuf; + +enum JobExecutionStatus { + UNSPECIFIED = 0; + RUNNING = 1; + SUCCEEDED = 2; + FAILED = 3; + UNKNOWN = 4; +} + +message JobData { + // All IDs are int64 for extendability, even when they are currently int32 in Spark. + int64 job_id = 1; + string name = 2; + optional string description = 3; + optional int64 submission_time = 4; + optional int64 completion_time = 5; + repeated int64 stage_ids = 6; + optional string job_group = 7; + JobExecutionStatus status = 8; + int32 num_tasks = 9; + int32 num_active_tasks = 10; + int32 num_completed_tasks = 11; + int32 num_skipped_tasks = 12; + int32 num_failed_tasks = 13; + int32 num_killed_tasks = 14; + int32 num_completed_indices = 15; + int32 num_active_stages = 16; + int32 num_completed_stages = 17; + int32 num_skipped_stages = 18; + int32 num_failed_stages = 19; + map kill_tasks_summary = 20; +} + +message JobDataWrapper { + JobData info = 1; + repeated int32 skipped_stages = 2; + optional int64 sql_execution_id = 3; +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c10bec5c960c..ad4c727c5977 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -133,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing. private[history] val listing: KVStore = { - KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf) + KVUtils.createKVStore(storePath, live = false, conf) } private val diskManager = storePath.map { path => diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 707c1829440c..ebf52189796a 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} -import org.apache.spark.internal.config.History.HybridStoreDiskBackend import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID @@ -773,10 +772,7 @@ private[spark] object AppStatusStore { conf: SparkConf, appStatusSource: Option[AppStatusSource] = None): AppStatusStore = { val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_)) - // For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now, - // instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with - // improvements on writes and reads. - val kvStore = KVUtils.createKVStore(storePath, HybridStoreDiskBackend.ROCKSDB, conf) + val kvStore = KVUtils.createKVStore(storePath, live = true, conf) val store = new ElementTrackingStore(kvStore, conf) val listener = new AppStatusListener(store, conf, true, appStatusSource) new AppStatusStore(store, listener = Some(listener)) diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala index 182996ac1fb0..42fa25393a3b 100644 --- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala @@ -36,6 +36,7 @@ import org.apache.spark.internal.config.History import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND import org.apache.spark.internal.config.History.HybridStoreDiskBackend import org.apache.spark.internal.config.History.HybridStoreDiskBackend._ +import org.apache.spark.status.protobuf.KVStoreProtobufSerializer import org.apache.spark.util.Utils import org.apache.spark.util.kvstore._ @@ -71,12 +72,14 @@ private[spark] object KVUtils extends Logging { path: File, metadata: M, conf: SparkConf, - diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = { + diskBackend: Option[HybridStoreDiskBackend.Value] = None, + serializer: Option[KVStoreSerializer] = None): KVStore = { require(metadata != null, "Metadata is required.") + val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer()) val db = diskBackend.getOrElse(backend(conf)) match { - case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer()) - case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer()) + case LEVELDB => new LevelDB(path, kvSerializer) + case ROCKSDB => new RocksDB(path, kvSerializer) } val dbMeta = db.getMetadata(classTag[M].runtimeClass) if (dbMeta == null) { @@ -91,9 +94,26 @@ private[spark] object KVUtils extends Logging { def createKVStore( storePath: Option[File], - diskBackend: HybridStoreDiskBackend.Value, + live: Boolean, conf: SparkConf): KVStore = { storePath.map { path => + val diskBackend = if (live) { + // For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now, + // instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with + // improvements on writes and reads. + HybridStoreDiskBackend.ROCKSDB + } else { + HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND)) + } + + val serializer = if (live) { + // For the disk-based KV store of live UI, let's simply use protobuf serializer only. + // The default serializer is slow since it is using JSON+GZip encoding. + Some(new KVStoreProtobufSerializer()) + } else { + None + } + val dir = diskBackend match { case LEVELDB => "listing.ldb" case ROCKSDB => "listing.rdb" @@ -108,7 +128,7 @@ private[spark] object KVUtils extends Logging { conf.get(History.HISTORY_LOG_DIR)) try { - open(dbPath, metadata, conf, Some(diskBackend)) + open(dbPath, metadata, conf, Some(diskBackend), serializer) } catch { // If there's an error, remove the listing database and any existing UI database // from the store directory, since it's extremely likely that they'll all contain @@ -116,12 +136,12 @@ private[spark] object KVUtils extends Logging { case _: UnsupportedStoreVersionException | _: MetadataMismatchException => logInfo("Detected incompatible DB versions, deleting...") path.listFiles().foreach(Utils.deleteRecursively) - open(dbPath, metadata, conf, Some(diskBackend)) + open(dbPath, metadata, conf, Some(diskBackend), serializer) case dbExc @ (_: NativeDB.DBException | _: RocksDBException) => // Get rid of the corrupted data and re-create it. logWarning(s"Failed to load disk store $dbPath :", dbExc) Utils.deleteRecursively(dbPath) - open(dbPath, metadata, conf, Some(diskBackend)) + open(dbPath, metadata, conf, Some(diskBackend), serializer) } }.getOrElse(new InMemoryStore()) } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala new file mode 100644 index 000000000000..5c2752fa6ce3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import collection.JavaConverters._ +import java.util.Date + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.status.JobDataWrapper +import org.apache.spark.status.api.v1.JobData + +object JobDataWrapperSerializer { + def serialize(j: JobDataWrapper): Array[Byte] = { + val jobData = serializeJobData(j.info) + val builder = StoreTypes.JobDataWrapper.newBuilder() + builder.setInfo(jobData) + j.skippedStages.foreach(builder.addSkippedStages) + j.sqlExecutionId.foreach(builder.setSqlExecutionId) + builder.build().toByteArray + } + + def deserialize(bytes: Array[Byte]): JobDataWrapper = { + val wrapper = StoreTypes.JobDataWrapper.parseFrom(bytes) + val sqlExecutionId = getOptional(wrapper.hasSqlExecutionId, wrapper.getSqlExecutionId) + new JobDataWrapper( + deserializeJobData(wrapper.getInfo), + wrapper.getSkippedStagesList.asScala.map(_.toInt).toSet, + sqlExecutionId + ) + } + + private def getOptional[T](condition: Boolean, result: () => T): Option[T] = if (condition) { + Some(result()) + } else { + None + } + + private def serializeJobData(jobData: JobData): StoreTypes.JobData = { + val jobDataBuilder = StoreTypes.JobData.newBuilder() + jobDataBuilder.setJobId(jobData.jobId.toLong) + .setName(jobData.name) + .setStatus(serializeJobExecutionStatus(jobData.status)) + .setNumTasks(jobData.numTasks) + .setNumActiveTasks(jobData.numActiveTasks) + .setNumCompletedTasks(jobData.numCompletedTasks) + .setNumSkippedTasks(jobData.numSkippedTasks) + .setNumFailedTasks(jobData.numFailedTasks) + .setNumKilledTasks(jobData.numKilledTasks) + .setNumCompletedIndices(jobData.numCompletedIndices) + .setNumActiveStages(jobData.numActiveStages) + .setNumCompletedStages(jobData.numCompletedStages) + .setNumSkippedStages(jobData.numSkippedStages) + .setNumFailedStages(jobData.numFailedStages) + + jobData.description.foreach(jobDataBuilder.setDescription) + jobData.submissionTime.foreach { d => + jobDataBuilder.setSubmissionTime(d.getTime) + } + jobData.completionTime.foreach { d => + jobDataBuilder.setCompletionTime(d.getTime) + } + jobData.stageIds.foreach(id => jobDataBuilder.addStageIds(id.toLong)) + jobData.jobGroup.foreach(jobDataBuilder.setJobGroup) + jobData.killedTasksSummary.foreach { entry => + jobDataBuilder.putKillTasksSummary(entry._1, entry._2) + } + jobDataBuilder.build() + } + + private def deserializeJobData(info: StoreTypes.JobData): JobData = { + val description = getOptional(info.hasDescription, info.getDescription) + val submissionTime = + getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime)) + val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime)) + val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup) + val status = JobExecutionStatus.valueOf(info.getStatus.toString) + + new JobData( + jobId = info.getJobId.toInt, + name = info.getName, + description = description, + submissionTime = submissionTime, + completionTime = completionTime, + stageIds = info.getStageIdsList.asScala.map(_.toInt).toSeq, + jobGroup = jobGroup, + status = status, + numTasks = info.getNumTasks, + numActiveTasks = info.getNumActiveTasks, + numCompletedTasks = info.getNumCompletedTasks, + numSkippedTasks = info.getNumSkippedTasks, + numFailedTasks = info.getNumFailedTasks, + numKilledTasks = info.getNumKilledTasks, + numCompletedIndices = info.getNumCompletedIndices, + numActiveStages = info.getNumActiveStages, + numCompletedStages = info.getNumCompletedStages, + numSkippedStages = info.getNumSkippedStages, + numFailedStages = info.getNumFailedStages, + killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap) + } + + private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = { + StoreTypes.JobExecutionStatus.valueOf(j.toString) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala new file mode 100644 index 000000000000..2173821a219a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.spark.status.JobDataWrapper +import org.apache.spark.status.KVUtils.KVStoreScalaSerializer + +private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer { + override def serialize(o: Object): Array[Byte] = o match { + case j: JobDataWrapper => JobDataWrapperSerializer.serialize(j) + case other => super.serialize(other) + } + + override def deserialize[T](data: Array[Byte], klass: Class[T]): T = klass match { + case _ if classOf[JobDataWrapper].isAssignableFrom(klass) => + JobDataWrapperSerializer.deserialize(data).asInstanceOf[T] + case other => super.deserialize(data, klass) + } +} diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index ec92877ce94f..24a8a6844f1f 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.status import java.io.File import java.util.{Date, Properties} -import scala.collection.immutable.Map import scala.reflect.{classTag, ClassTag} import org.scalatest.BeforeAndAfter @@ -35,6 +34,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster._ import org.apache.spark.status.ListenerEventsTestHelper._ import org.apache.spark.status.api.v1 +import org.apache.spark.status.protobuf.KVStoreProtobufSerializer import org.apache.spark.storage._ import org.apache.spark.tags.ExtendedLevelDBTest import org.apache.spark.util.Utils @@ -45,7 +45,7 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2) private var time: Long = _ - private var testDir: File = _ + protected var testDir: File = _ private var store: ElementTrackingStore = _ private var taskIdTracker = -1L @@ -1904,3 +1904,13 @@ class AppStatusListenerWithRocksDBSuite extends AppStatusListenerSuite { override def conf: SparkConf = super.conf .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString) } + +class AppStatusListenerWithProtobufSerializerSuite extends AppStatusListenerSuite { + override def createKVStore: KVStore = + KVUtils.open( + testDir, + getClass().getName(), + conf, + Some(HybridStoreDiskBackend.ROCKSDB), + Some(new KVStoreProtobufSerializer())) +} diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala new file mode 100644 index 000000000000..7ca446e9a941 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import java.util.Date + +import org.apache.spark.{JobExecutionStatus, SparkFunSuite} +import org.apache.spark.status.JobDataWrapper +import org.apache.spark.status.api.v1.JobData + +class KVStoreProtobufSerializerSuite extends SparkFunSuite { + private val serializer = new KVStoreProtobufSerializer() + + test("Job data") { + val input = new JobDataWrapper( + new JobData( + jobId = 1, + name = "test", + description = Some("test description"), + submissionTime = Some(new Date(123456L)), + completionTime = Some(new Date(654321L)), + stageIds = Seq(1, 2, 3, 4), + jobGroup = Some("group"), + status = JobExecutionStatus.UNKNOWN, + numTasks = 2, + numActiveTasks = 3, + numCompletedTasks = 4, + numSkippedTasks = 5, + numFailedTasks = 6, + numKilledTasks = 7, + numCompletedIndices = 8, + numActiveStages = 9, + numCompletedStages = 10, + numSkippedStages = 11, + numFailedStages = 12, + killedTasksSummary = Map("a" -> 1, "b" -> 2)), + Set(1, 2), + Some(999) + ) + + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[JobDataWrapper]) + assert(result.info.jobId == input.info.jobId) + assert(result.info.description == input.info.description) + assert(result.info.submissionTime == input.info.submissionTime) + assert(result.info.completionTime == input.info.completionTime) + assert(result.info.stageIds == input.info.stageIds) + assert(result.info.jobGroup == input.info.jobGroup) + assert(result.info.status == input.info.status) + assert(result.info.numTasks == input.info.numTasks) + assert(result.info.numActiveTasks == input.info.numActiveTasks) + assert(result.info.numCompletedTasks == input.info.numCompletedTasks) + assert(result.info.numSkippedTasks == input.info.numSkippedTasks) + assert(result.info.numFailedTasks == input.info.numFailedTasks) + assert(result.info.numKilledTasks == input.info.numKilledTasks) + assert(result.info.numCompletedIndices == input.info.numCompletedIndices) + assert(result.info.numActiveStages == input.info.numActiveStages) + assert(result.info.numCompletedStages == input.info.numCompletedStages) + assert(result.info.numSkippedStages == input.info.numSkippedStages) + assert(result.info.numFailedStages == input.info.numFailedStages) + assert(result.info.killedTasksSummary == input.info.killedTasksSummary) + assert(result.skippedStages == input.skippedStages) + assert(result.sqlExecutionId == input.sqlExecutionId) + } +} + diff --git a/pom.xml b/pom.xml index 93acf74d2ca4..f8e16585adcc 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ 2.5.0 3.21.9 + 3.11.4 ${hadoop.version} 3.6.3 2.13.0 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index b82c53a06350..1e00ac7c1906 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -611,9 +611,23 @@ object SparkParallelTestGrouping { object Core { import scala.sys.process.Process + import BuildCommons.protoVersion def buildenv = Process(Seq("uname")).!!.trim.replaceFirst("[^A-Za-z0-9].*", "").toLowerCase def bashpath = Process(Seq("where", "bash")).!!.split("[\r\n]+").head.replace('\\', '/') lazy val settings = Seq( + // Setting version for the protobuf compiler. This has to be propagated to every sub-project + // even if the project is not using it. + PB.protocVersion := BuildCommons.protoVersion, + // For some reason the resolution from the imported Maven build does not work for some + // of these dependendencies that we need to shade later on. + libraryDependencies ++= { + Seq( + "com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf" + ) + }, + (Compile / PB.targets) := Seq( + PB.gens.java -> (Compile / sourceManaged).value + ), (Compile / resourceGenerators) += Def.task { val buildScript = baseDirectory.value + "/../build/spark-build-info" val targetDir = baseDirectory.value + "/target/extra-resources/"