From 972ff8a0da0adea5ea05cb4e10d9b0b875e9c0e4 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Thu, 29 Dec 2022 19:38:48 +0800 Subject: [PATCH 1/4] [SPARK-41768][CORE] Refactor the definition of enum - `JobExecutionStatus` to follow with the code style --- .../spark/status/protobuf/store_types.proto | 10 +++--- .../protobuf/JobDataWrapperSerializer.scala | 12 +++---- .../JobExecutionStatusSerializer.scala | 35 +++++++++++++++++++ .../sql/SQLExecutionUIDataSerializer.scala | 7 ++-- 4 files changed, 47 insertions(+), 17 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 22e22eea1a266..16558d18c92c7 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -19,11 +19,11 @@ syntax = "proto3"; package org.apache.spark.status.protobuf; enum JobExecutionStatus { - UNSPECIFIED = 0; - RUNNING = 1; - SUCCEEDED = 2; - FAILED = 3; - UNKNOWN = 4; + JOB_EXECUTION_STATUS_UNSPECIFIED = 0; + JOB_EXECUTION_STATUS_RUNNING = 1; + JOB_EXECUTION_STATUS_SUCCEEDED = 2; + JOB_EXECUTION_STATUS_FAILED = 3; + JOB_EXECUTION_STATUS_UNKNOWN = 4; } message JobData { diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala index 98ac2d643c964..e2e2a1a8d89cc 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -17,10 +17,10 @@ package org.apache.spark.status.protobuf -import collection.JavaConverters._ import java.util.Date -import org.apache.spark.JobExecutionStatus +import collection.JavaConverters._ + import org.apache.spark.status.JobDataWrapper import org.apache.spark.status.api.v1.JobData import org.apache.spark.status.protobuf.Utils.getOptional @@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe { val jobDataBuilder = StoreTypes.JobData.newBuilder() jobDataBuilder.setJobId(jobData.jobId.toLong) .setName(jobData.name) - .setStatus(serializeJobExecutionStatus(jobData.status)) + .setStatus(JobExecutionStatusSerializer.serialize(jobData.status)) .setNumTasks(jobData.numTasks) .setNumActiveTasks(jobData.numActiveTasks) .setNumCompletedTasks(jobData.numCompletedTasks) @@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe { getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime)) val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime)) val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup) - val status = JobExecutionStatus.valueOf(info.getStatus.toString) + val status = JobExecutionStatusSerializer.deserialize(info.getStatus) new JobData( jobId = info.getJobId.toInt, @@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe { numFailedStages = info.getNumFailedStages, killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap) } - - private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = { - StoreTypes.JobExecutionStatus.valueOf(j.toString) - } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala new file mode 100644 index 0000000000000..8b9b137ea377d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.JobExecutionStatus + +object JobExecutionStatusSerializer { + + private def PREFIX = "JOB_EXECUTION_STATUS_" + + private[protobuf] def serialize(input: JobExecutionStatus): StoreTypes.JobExecutionStatus = { + StoreTypes.JobExecutionStatus.valueOf(PREFIX + input.toString) + } + + private[protobuf] def deserialize(binary: StoreTypes.JobExecutionStatus): JobExecutionStatus = { + JobExecutionStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index 77b6f8925cb66..7a4a3e2a55d6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -21,9 +21,8 @@ import java.util.Date import collection.JavaConverters._ -import org.apache.spark.JobExecutionStatus import org.apache.spark.sql.execution.ui.SQLExecutionUIData -import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} +import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, ProtobufSerDe, StoreTypes} import org.apache.spark.status.protobuf.Utils.getOptional class SQLExecutionUIDataSerializer extends ProtobufSerDe { @@ -46,7 +45,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { ui.errorMessage.foreach(builder.setErrorMessage) ui.jobs.foreach { case (id, status) => - builder.putJobs(id.toLong, StoreTypes.JobExecutionStatus.valueOf(status.toString)) + builder.putJobs(id.toLong, JobExecutionStatusSerializer.serialize(status)) } ui.stages.foreach(stageId => builder.addStages(stageId.toLong)) val metricValues = ui.metricValues @@ -66,7 +65,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { val metrics = ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m)) val jobs = ui.getJobsMap.asScala.map { - case (jobId, status) => jobId.toInt -> JobExecutionStatus.valueOf(status.toString) + case (jobId, status) => jobId.toInt -> JobExecutionStatusSerializer.deserialize(status) }.toMap val metricValues = ui.getMetricValuesMap.asScala.map { case (k, v) => k.toLong -> v From 8eff720f1166c7027fdd4a690c44d35e446119c1 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Sun, 1 Jan 2023 11:29:00 +0800 Subject: [PATCH 2/4] [SPARK-41768][CORE] Refactor the definition of enum - `JobExecutionStatus` to follow with the code style --- .../spark/status/protobuf/store_types.proto | 13 ++--- .../DeterministicLevelSerializer.scala | 47 +++++++++++++++++++ .../JobExecutionStatusSerializer.scala | 26 ++++++---- .../RDDOperationGraphWrapperSerializer.scala | 9 ++-- .../protobuf/StageStatusSerializer.scala | 26 ++++++---- 5 files changed, 93 insertions(+), 28 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index e8951a5dc193a..c1ec8211c59bc 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -434,13 +434,14 @@ message RDDOperationEdge { int32 to_id = 2; } +enum DeterministicLevel { + DETERMINISTIC_LEVEL_UNSPECIFIED = 0; + DETERMINISTIC_LEVEL_DETERMINATE = 1; + DETERMINISTIC_LEVEL_UNORDERED = 2; + DETERMINISTIC_LEVEL_INDETERMINATE = 3; +} + message RDDOperationNode { - enum DeterministicLevel { - UNSPECIFIED = 0; - DETERMINATE = 1; - UNORDERED = 2; - INDETERMINATE = 3; - } int32 id = 1; string name = 2; bool cached = 3; diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala new file mode 100644 index 0000000000000..5f28a2fd4e8a5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.spark.rdd.DeterministicLevel +import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} + +private[protobuf] object DeterministicLevelSerializer { + + def serialize(input: DeterministicLevel.Value): GDeterministicLevel = { + input match { + case DeterministicLevel.DETERMINATE => + GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE + case DeterministicLevel.UNORDERED => + GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED + case DeterministicLevel.INDETERMINATE => + GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE + } + } + + def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = { + binary match { + case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE => + DeterministicLevel.DETERMINATE + case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED => + DeterministicLevel.UNORDERED + case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE => + DeterministicLevel.INDETERMINATE + case _ => null + } + } +} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala index 8b9b137ea377d..fd07da61a9e5b 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala @@ -17,19 +17,27 @@ package org.apache.spark.status.protobuf -import org.apache.commons.lang3.StringUtils - import org.apache.spark.JobExecutionStatus +import org.apache.spark.status.protobuf.StoreTypes.{JobExecutionStatus => GJobExecutionStatus} -object JobExecutionStatusSerializer { - - private def PREFIX = "JOB_EXECUTION_STATUS_" +private[protobuf] object JobExecutionStatusSerializer { - private[protobuf] def serialize(input: JobExecutionStatus): StoreTypes.JobExecutionStatus = { - StoreTypes.JobExecutionStatus.valueOf(PREFIX + input.toString) + def serialize(input: JobExecutionStatus): GJobExecutionStatus = { + input match { + case JobExecutionStatus.RUNNING => GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING + case JobExecutionStatus.SUCCEEDED => GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED + case JobExecutionStatus.FAILED => GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED + case JobExecutionStatus.UNKNOWN => GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN + } } - private[protobuf] def deserialize(binary: StoreTypes.JobExecutionStatus): JobExecutionStatus = { - JobExecutionStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX)) + def deserialize(binary: GJobExecutionStatus): JobExecutionStatus = { + binary match { + case GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING => JobExecutionStatus.RUNNING + case GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED => JobExecutionStatus.SUCCEEDED + case GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED => JobExecutionStatus.FAILED + case GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN => JobExecutionStatus.UNKNOWN + case _ => null + } } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala index 8975062082c19..af6ea19908e18 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -19,7 +19,6 @@ package org.apache.spark.status.protobuf import scala.collection.JavaConverters._ -import org.apache.spark.rdd.DeterministicLevel import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper} import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode} @@ -81,8 +80,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { } private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = { - val outputDeterministicLevel = StoreTypes.RDDOperationNode.DeterministicLevel - .valueOf(node.outputDeterministicLevel.toString) + val outputDeterministicLevel = DeterministicLevelSerializer.serialize( + node.outputDeterministicLevel) val builder = StoreTypes.RDDOperationNode.newBuilder() builder.setId(node.id) builder.setName(node.name) @@ -100,8 +99,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { cached = node.getCached, barrier = node.getBarrier, callsite = node.getCallsite, - outputDeterministicLevel = - DeterministicLevel.withName(node.getOutputDeterministicLevel.toString) + outputDeterministicLevel = DeterministicLevelSerializer.deserialize( + node.getOutputDeterministicLevel) ) } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala index 6014379bb1e52..fbd874cf54184 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala @@ -17,19 +17,29 @@ package org.apache.spark.status.protobuf -import org.apache.commons.lang3.StringUtils - import org.apache.spark.status.api.v1.StageStatus +import org.apache.spark.status.protobuf.StoreTypes.{StageStatus => GStageStatus} private[protobuf] object StageStatusSerializer { - private def PREFIX = "STAGE_STATUS_" - - def serialize(input: StageStatus): StoreTypes.StageStatus = { - StoreTypes.StageStatus.valueOf(PREFIX + input.toString) + def serialize(input: StageStatus): GStageStatus = { + input match { + case StageStatus.ACTIVE => GStageStatus.STAGE_STATUS_ACTIVE + case StageStatus.COMPLETE => GStageStatus.STAGE_STATUS_COMPLETE + case StageStatus.FAILED => GStageStatus.STAGE_STATUS_FAILED + case StageStatus.PENDING => GStageStatus.STAGE_STATUS_PENDING + case StageStatus.SKIPPED => GStageStatus.STAGE_STATUS_SKIPPED + } } - def deserialize(binary: StoreTypes.StageStatus): StageStatus = { - StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX)) + def deserialize(binary: GStageStatus): StageStatus = { + binary match { + case GStageStatus.STAGE_STATUS_ACTIVE => StageStatus.ACTIVE + case GStageStatus.STAGE_STATUS_COMPLETE => StageStatus.COMPLETE + case GStageStatus.STAGE_STATUS_FAILED => StageStatus.FAILED + case GStageStatus.STAGE_STATUS_PENDING => StageStatus.PENDING + case GStageStatus.STAGE_STATUS_SKIPPED => StageStatus.SKIPPED + case _ => null + } } } From 0609a69b55e09f5f87b1308723b9a7fbf031f690 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Sun, 1 Jan 2023 20:33:39 +0800 Subject: [PATCH 3/4] [SPARK-41768][CORE] Refactor the definition of enum - `JobExecutionStatus` to follow with the code style --- .../status/protobuf/SerializerBenchmark.scala | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala new file mode 100644 index 0000000000000..7f95a46eb5776 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} + +object SerializerBenchmark extends BenchmarkBase { + + import org.apache.spark.rdd.DeterministicLevel + + private lazy val scalaToPb = Map( + DeterministicLevel.DETERMINATE -> GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE, + DeterministicLevel.UNORDERED -> GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED, + DeterministicLevel.INDETERMINATE -> GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE + ) + + val valuesPerIteration = 1000000 + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val benchmark = new Benchmark( + s"Test serialize", + valuesPerIteration, + output = output) + + + val testValues = DeterministicLevel.values + + benchmark.addCase("Use case match") { _: Int => + for (_ <- 0L until valuesPerIteration) { + testValues.foreach { + DeterministicLevelSerializer.serialize + } + } + } + + benchmark.addCase("Use map") { _: Int => + for (_ <- 0L until valuesPerIteration) { + testValues.foreach(scalaToPb) + } + } + + benchmark.run() + } +} + From 92a8545ab681af502f1f9f9a735b5d40ca3d04a6 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 4 Jan 2023 16:42:40 +0800 Subject: [PATCH 4/4] [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style --- .../DeterministicLevelSerializer.scala | 47 -------------- .../RDDOperationGraphWrapperSerializer.scala | 28 +++++++++ .../status/protobuf/SerializerBenchmark.scala | 61 ------------------- 3 files changed, 28 insertions(+), 108 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala delete mode 100644 core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala deleted file mode 100644 index 5f28a2fd4e8a5..0000000000000 --- a/core/src/main/scala/org/apache/spark/status/protobuf/DeterministicLevelSerializer.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf - -import org.apache.spark.rdd.DeterministicLevel -import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} - -private[protobuf] object DeterministicLevelSerializer { - - def serialize(input: DeterministicLevel.Value): GDeterministicLevel = { - input match { - case DeterministicLevel.DETERMINATE => - GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE - case DeterministicLevel.UNORDERED => - GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED - case DeterministicLevel.INDETERMINATE => - GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE - } - } - - def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = { - binary match { - case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE => - DeterministicLevel.DETERMINATE - case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED => - DeterministicLevel.UNORDERED - case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE => - DeterministicLevel.INDETERMINATE - case _ => null - } - } -} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala index af6ea19908e18..44622514ac9c9 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -19,7 +19,9 @@ package org.apache.spark.status.protobuf import scala.collection.JavaConverters._ +import org.apache.spark.rdd.DeterministicLevel import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper} +import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode} class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { @@ -117,3 +119,29 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { toId = edge.getToId) } } + +private[protobuf] object DeterministicLevelSerializer { + + def serialize(input: DeterministicLevel.Value): GDeterministicLevel = { + input match { + case DeterministicLevel.DETERMINATE => + GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE + case DeterministicLevel.UNORDERED => + GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED + case DeterministicLevel.INDETERMINATE => + GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE + } + } + + def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = { + binary match { + case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE => + DeterministicLevel.DETERMINATE + case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED => + DeterministicLevel.UNORDERED + case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE => + DeterministicLevel.INDETERMINATE + case _ => null + } + } +} diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala b/core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala deleted file mode 100644 index 7f95a46eb5776..0000000000000 --- a/core/src/test/scala/org/apache/spark/status/protobuf/SerializerBenchmark.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf - -import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} -import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} - -object SerializerBenchmark extends BenchmarkBase { - - import org.apache.spark.rdd.DeterministicLevel - - private lazy val scalaToPb = Map( - DeterministicLevel.DETERMINATE -> GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE, - DeterministicLevel.UNORDERED -> GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED, - DeterministicLevel.INDETERMINATE -> GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE - ) - - val valuesPerIteration = 1000000 - - override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - val benchmark = new Benchmark( - s"Test serialize", - valuesPerIteration, - output = output) - - - val testValues = DeterministicLevel.values - - benchmark.addCase("Use case match") { _: Int => - for (_ <- 0L until valuesPerIteration) { - testValues.foreach { - DeterministicLevelSerializer.serialize - } - } - } - - benchmark.addCase("Use map") { _: Int => - for (_ <- 0L until valuesPerIteration) { - testValues.foreach(scalaToPb) - } - } - - benchmark.run() - } -} -