Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ package org.apache.spark.status.protobuf;

enum JobExecutionStatus {
JOB_EXECUTION_STATUS_UNSPECIFIED = 0;
RUNNING = 1;
SUCCEEDED = 2;
FAILED = 3;
UNKNOWN = 4;
JOB_EXECUTION_STATUS_RUNNING = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels redundant to have the name in the enum class and in the constant name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Yes, but we are following https://developers.google.com/protocol-buffers/docs/style#enums here. The purpose is to avoid naming conflicts. For example, if there is another enum containing FAILED or SUCCEEDED, the Protobuf compiler won't fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok I get it

JOB_EXECUTION_STATUS_SUCCEEDED = 2;
JOB_EXECUTION_STATUS_FAILED = 3;
JOB_EXECUTION_STATUS_UNKNOWN = 4;
}

message JobData {
Expand Down Expand Up @@ -434,13 +434,14 @@ message RDDOperationEdge {
int32 to_id = 2;
}

enum DeterministicLevel {
DETERMINISTIC_LEVEL_UNSPECIFIED = 0;
DETERMINISTIC_LEVEL_DETERMINATE = 1;
DETERMINISTIC_LEVEL_UNORDERED = 2;
DETERMINISTIC_LEVEL_INDETERMINATE = 3;
}

message RDDOperationNode {
enum DeterministicLevel {
UNSPECIFIED = 0;
DETERMINATE = 1;
UNORDERED = 2;
INDETERMINATE = 3;
}
int32 id = 1;
string name = 2;
bool cached = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.status.protobuf

import collection.JavaConverters._
import java.util.Date

import org.apache.spark.JobExecutionStatus
import collection.JavaConverters._

import org.apache.spark.status.JobDataWrapper
import org.apache.spark.status.api.v1.JobData
import org.apache.spark.status.protobuf.Utils.getOptional
Expand Down Expand Up @@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
val jobDataBuilder = StoreTypes.JobData.newBuilder()
jobDataBuilder.setJobId(jobData.jobId.toLong)
.setName(jobData.name)
.setStatus(serializeJobExecutionStatus(jobData.status))
.setStatus(JobExecutionStatusSerializer.serialize(jobData.status))
.setNumTasks(jobData.numTasks)
.setNumActiveTasks(jobData.numActiveTasks)
.setNumCompletedTasks(jobData.numCompletedTasks)
Expand Down Expand Up @@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime))
val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime))
val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
val status = JobExecutionStatus.valueOf(info.getStatus.toString)
val status = JobExecutionStatusSerializer.deserialize(info.getStatus)

new JobData(
jobId = info.getJobId.toInt,
Expand All @@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
numFailedStages = info.getNumFailedStages,
killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
}

private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = {
StoreTypes.JobExecutionStatus.valueOf(j.toString)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.protobuf

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.protobuf.StoreTypes.{JobExecutionStatus => GJobExecutionStatus}

private[protobuf] object JobExecutionStatusSerializer {

def serialize(input: JobExecutionStatus): GJobExecutionStatus = {
input match {
case JobExecutionStatus.RUNNING => GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING
case JobExecutionStatus.SUCCEEDED => GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED
case JobExecutionStatus.FAILED => GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED
case JobExecutionStatus.UNKNOWN => GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN
}
}

def deserialize(binary: GJobExecutionStatus): JobExecutionStatus = {
binary match {
case GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING => JobExecutionStatus.RUNNING
case GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED => JobExecutionStatus.SUCCEEDED
case GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED => JobExecutionStatus.FAILED
case GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN => JobExecutionStatus.UNKNOWN
case _ => null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._

import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper}
import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel}
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}

class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
Expand Down Expand Up @@ -81,8 +82,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
}

private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = {
val outputDeterministicLevel = StoreTypes.RDDOperationNode.DeterministicLevel
.valueOf(node.outputDeterministicLevel.toString)
val outputDeterministicLevel = DeterministicLevelSerializer.serialize(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DeterministicLevelSerializer is only used here, shall we move it into this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

node.outputDeterministicLevel)
val builder = StoreTypes.RDDOperationNode.newBuilder()
builder.setId(node.id)
builder.setName(node.name)
Expand All @@ -100,8 +101,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
cached = node.getCached,
barrier = node.getBarrier,
callsite = node.getCallsite,
outputDeterministicLevel =
DeterministicLevel.withName(node.getOutputDeterministicLevel.toString)
outputDeterministicLevel = DeterministicLevelSerializer.deserialize(
node.getOutputDeterministicLevel)
)
}

Expand All @@ -118,3 +119,29 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
toId = edge.getToId)
}
}

private[protobuf] object DeterministicLevelSerializer {

def serialize(input: DeterministicLevel.Value): GDeterministicLevel = {
input match {
case DeterministicLevel.DETERMINATE =>
GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE
case DeterministicLevel.UNORDERED =>
GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED
case DeterministicLevel.INDETERMINATE =>
GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE
}
}

def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = {
binary match {
case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE =>
DeterministicLevel.DETERMINATE
case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED =>
DeterministicLevel.UNORDERED
case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE =>
DeterministicLevel.INDETERMINATE
case _ => null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,29 @@

package org.apache.spark.status.protobuf

import org.apache.commons.lang3.StringUtils

import org.apache.spark.status.api.v1.StageStatus
import org.apache.spark.status.protobuf.StoreTypes.{StageStatus => GStageStatus}

private[protobuf] object StageStatusSerializer {

private def PREFIX = "STAGE_STATUS_"

def serialize(input: StageStatus): StoreTypes.StageStatus = {
StoreTypes.StageStatus.valueOf(PREFIX + input.toString)
def serialize(input: StageStatus): GStageStatus = {
input match {
case StageStatus.ACTIVE => GStageStatus.STAGE_STATUS_ACTIVE
case StageStatus.COMPLETE => GStageStatus.STAGE_STATUS_COMPLETE
case StageStatus.FAILED => GStageStatus.STAGE_STATUS_FAILED
case StageStatus.PENDING => GStageStatus.STAGE_STATUS_PENDING
case StageStatus.SKIPPED => GStageStatus.STAGE_STATUS_SKIPPED
}
}

def deserialize(binary: StoreTypes.StageStatus): StageStatus = {
StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX))
def deserialize(binary: GStageStatus): StageStatus = {
binary match {
case GStageStatus.STAGE_STATUS_ACTIVE => StageStatus.ACTIVE
case GStageStatus.STAGE_STATUS_COMPLETE => StageStatus.COMPLETE
case GStageStatus.STAGE_STATUS_FAILED => StageStatus.FAILED
case GStageStatus.STAGE_STATUS_PENDING => StageStatus.PENDING
case GStageStatus.STAGE_STATUS_SKIPPED => StageStatus.SKIPPED
case _ => null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import java.util.Date

import collection.JavaConverters._

import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, ProtobufSerDe, StoreTypes}
import org.apache.spark.status.protobuf.Utils.getOptional

class SQLExecutionUIDataSerializer extends ProtobufSerDe {
Expand All @@ -46,7 +45,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
ui.errorMessage.foreach(builder.setErrorMessage)
ui.jobs.foreach {
case (id, status) =>
builder.putJobs(id.toLong, StoreTypes.JobExecutionStatus.valueOf(status.toString))
builder.putJobs(id.toLong, JobExecutionStatusSerializer.serialize(status))
}
ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
val metricValues = ui.metricValues
Expand All @@ -66,7 +65,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
val metrics =
ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m))
val jobs = ui.getJobsMap.asScala.map {
case (jobId, status) => jobId.toInt -> JobExecutionStatus.valueOf(status.toString)
case (jobId, status) => jobId.toInt -> JobExecutionStatusSerializer.deserialize(status)
}.toMap
val metricValues = ui.getMetricValuesMap.asScala.map {
case (k, v) => k.toLong -> v
Expand Down