Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GUI and deployment scripts use compiling service #2848

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ server:
applicationContextPath: /
applicationConnectors:
- type: http
port: 9090
port: 8082
adminConnectors:
- type: http
port: 9091
port: 8083
requestLog:
type: classic
timeZone: UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ object PhysicalOp {
@JsonIgnoreProperties(
Array(
"opExecInitInfo", // function type, ignore it
"locationPreference", // runtime info, ignore it
"partitionRequirement", // runtime info, ignore it
"derivePartition", // function type, ignore it
"inputPorts", // may contain very long stacktrace, ignore it
"outputPorts", // same reason with above
Expand Down Expand Up @@ -225,6 +227,7 @@ case class PhysicalOp(
outputPorts.forall(port => port._2._2.isEmpty)
}

@JsonIgnore
def isPythonBased: Boolean = {
opExecInitInfo match {
case opExecInfo: OpExecInitInfoWithCode =>
Expand Down
23 changes: 23 additions & 0 deletions core/amber/src/main/scala/edu/uci/ics/texera/web/CORSFilter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package edu.uci.ics.texera.web

import javax.ws.rs.container.{
ContainerRequestContext,
ContainerResponseContext,
ContainerResponseFilter
}
import javax.ws.rs.ext.Provider
import javax.ws.rs.core.MultivaluedMap

@Provider
class CORSFilter extends ContainerResponseFilter {
override def filter(
requestContext: ContainerRequestContext,
responseContext: ContainerResponseContext
): Unit = {
val headers: MultivaluedMap[String, AnyRef] = responseContext.getHeaders
headers.add("Access-Control-Allow-Origin", "*")
headers.add("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT, OPTIONS")
headers.add("Access-Control-Allow-Headers", "Authorization, Content-Type, Accept")
headers.add("Access-Control-Allow-Credentials", "true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ class TexeraWebApplication

environment.jersey.register(classOf[SystemMetadataResource])
// environment.jersey().register(classOf[MockKillWorkerResource])
environment.jersey.register(classOf[SchemaPropagationResource])

if (AmberConfig.isUserSystemEnabled) {
// register JWT Auth layer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class TexeraWorkflowCompilingService
// serve backend at /api/texera
environment.jersey.setUrlPattern("/api/texera/*")

// register CORS filter
environment.jersey.register(classOf[CORSFilter])

// register the compilation endpoint
environment.jersey.register(classOf[WorkflowCompilationResource])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import edu.uci.ics.texera.workflow.common.operators.LogicalOp
import edu.uci.ics.texera.workflow.common.workflow.LogicalLink

case class EditingTimeCompilationRequest(
operators: List[LogicalOp],
links: List[LogicalLink],
opsToViewResult: List[String],
opsToReuseResult: List[String]
) extends TexeraWebSocketRequest {
operators: List[LogicalOp],
links: List[LogicalLink],
opsToViewResult: List[String],
opsToReuseResult: List[String]
) extends TexeraWebSocketRequest {

def toLogicalPlanPojo: LogicalPlanPojo = {
LogicalPlanPojo(operators, links, opsToViewResult, opsToReuseResult)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ class SchemaPropagationResource extends LazyLogging {
@Path("/autocomplete/{wid}")
@RolesAllowed(Array("REGULAR", "ADMIN"))
def suggestAutocompleteSchema(
workflowStr: String,
@PathParam("wid") wid: UInteger,
@Auth sessionUser: SessionUser
): SchemaPropagationResponse = {
workflowStr: String,
@PathParam("wid") wid: UInteger,
@Auth sessionUser: SessionUser
): SchemaPropagationResponse = {

val logicalPlanPojo = Utils.objectMapper.readValue(workflowStr, classOf[LogicalPlanPojo])

Expand All @@ -42,7 +42,7 @@ class SchemaPropagationResource extends LazyLogging {
val logicalPlan = LogicalPlan(logicalPlanPojo)

// the PhysicalPlan with topology expanded.
val physicalPlan = PhysicalPlan(context, logicalPlan)
val physicalPlan = PhysicalPlan(context, logicalPlan, None)

// Extract physical input schemas, excluding internal ports
val physicalInputSchemas = physicalPlan.operators.map { physicalOp =>
Expand Down Expand Up @@ -70,4 +70,4 @@ class SchemaPropagationResource extends LazyLogging {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.common.virtualidentity.WorkflowIdentity
import edu.uci.ics.texera.Utils
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import edu.uci.ics.texera.web.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.tuple.schema.Attribute
import edu.uci.ics.texera.workflow.common.workflow.{PhysicalPlan, WorkflowCompiler}
Expand All @@ -16,7 +17,7 @@ import javax.ws.rs.core.MediaType
case class WorkflowCompilationResponse(
physicalPlan: Option[PhysicalPlan],
operatorInputSchemas: Map[String, List[Option[List[Attribute]]]],
operatorErrors: Map[String, String]
operatorErrors: List[WorkflowFatalError]
)

@Consumes(Array(MediaType.APPLICATION_JSON))
Expand All @@ -40,7 +41,7 @@ class WorkflowCompilationResource extends LazyLogging {
val workflowCompilationResult =
new WorkflowCompiler(context).compile(logicalPlanPojo)
// return the result
WorkflowCompilationResponse(
val response = WorkflowCompilationResponse(
physicalPlan = workflowCompilationResult.physicalPlan,
operatorInputSchemas = workflowCompilationResult.operatorIdToInputSchemas.map {
case (operatorIdentity, schemas) =>
Expand All @@ -53,9 +54,24 @@ class WorkflowCompilationResource extends LazyLogging {
}
(opId, attributes)
},
operatorErrors = workflowCompilationResult.operatorIdToError.map {
case (operatorIdentity, error) => (operatorIdentity.id, error.toString)
}
operatorErrors = workflowCompilationResult.operatorErrors
)

println(
s"OperatorInputSchemas: ${Utils.objectMapper.writeValueAsString(response.operatorInputSchemas)}"
)
println(s"OperatorErrors: ${Utils.objectMapper.writeValueAsString(response.operatorErrors)}")
try {
val physicalPlanStr = Utils.objectMapper.writeValueAsString(response.physicalPlan)
println(s"PhysicalPlan: $physicalPlanStr")
} catch {
case e: Exception =>
logger.error(
"Error serializing PhysicalPlan",
e
) // This will log the error message and the stack trace
}

response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,42 +90,6 @@ class WorkflowWebsocketResource extends LazyLogging {
)
sessionState.send(modifyLogicResponse)
}
case editingTimeCompilationRequest: EditingTimeCompilationRequest =>
// TODO: remove this after separating the workflow compiler as a standalone service
val stateStore = if (executionStateOpt.isDefined) {
val currentState =
executionStateOpt.get.executionStateStore.metadataStore.getState.state
if (currentState == RUNNING || currentState == PAUSED) {
// disable check if the workflow execution is active.
return
}
executionStateOpt.get.executionStateStore
} else {
new ExecutionStateStore()
}
val workflowContext = new WorkflowContext(
sessionState.getCurrentWorkflowState.get.workflowId
)
try {
val workflowCompiler =
new WorkflowCompiler(workflowContext)
val newPlan = workflowCompiler.compileLogicalPlan(
editingTimeCompilationRequest.toLogicalPlanPojo,
stateStore
)
val validateResult = WorkflowCacheChecker.handleCacheStatusUpdate(
workflowStateOpt.get.lastCompletedLogicalPlan,
newPlan,
editingTimeCompilationRequest
)
sessionState.send(CacheStatusUpdateEvent(validateResult))
} catch {
case t: Throwable => // skip, rethrow this exception will overwrite the compilation errors reported below.
} finally {
if (stateStore.metadataStore.getState.fatalErrors.nonEmpty) {
sessionState.send(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors))
}
}
case workflowExecuteRequest: WorkflowExecuteRequest =>
workflowStateOpt match {
case Some(workflow) => workflow.initExecutionService(workflowExecuteRequest, uidOpt)
Expand Down Expand Up @@ -165,4 +129,4 @@ class WorkflowWebsocketResource extends LazyLogging {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import scala.collection.mutable
object WorkflowCacheChecker {

def handleCacheStatusUpdate(
oldPlan: Option[LogicalPlan],
newPlan: LogicalPlan,
request: EditingTimeCompilationRequest
): Map[String, String] = {
oldPlan: Option[LogicalPlan],
newPlan: LogicalPlan,
request: EditingTimeCompilationRequest
): Map[String, String] = {
val validCacheOps = new WorkflowCacheChecker(oldPlan, newPlan).getValidCacheReuse
val cacheUpdateResult = request.opsToReuseResult
.map(idString => OperatorIdentity(idString))
Expand Down Expand Up @@ -102,4 +102,4 @@ class WorkflowCacheChecker(oldWorkflowOpt: Option[LogicalPlan], newWorkflow: Log
.toSet
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public enum AttributeType implements Serializable {
* New AttributeTypes might need to be converted into a numerical value in order to perform
* aggregations.
* <p>
* 5. SchemaPropagationService.SchemaAttribute
* <code>src/app/workspace/service/dynamic-schema/schema-propagation/schema-propagation.service.ts</code>
* 5. WorkflowCompilingService.SchemaAttribute
* <code>src/app/workspace/service/workflow-compilation/workflow-compiling.service.ts</code>
* Declare the frontend SchemaAttribute for the new AttributeType.
* <p>
* 6. ArrowUtils (Java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,61 @@ import org.jgrapht.graph.DirectedAcyclicGraph
import org.jgrapht.traverse.TopologicalOrderIterator
import org.jgrapht.util.SupplierUtil

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.{IteratorHasAsScala, ListHasAsScala, SetHasAsScala}
import scala.util.{Failure, Success, Try}

object PhysicalPlan {
def apply(context: WorkflowContext, logicalPlan: LogicalPlan): PhysicalPlan = {
def apply(
context: WorkflowContext,
logicalPlan: LogicalPlan,
errorList: Option[ArrayBuffer[(OperatorIdentity, Throwable)]]
): PhysicalPlan = {

var physicalPlan = PhysicalPlan(operators = Set.empty, links = Set.empty)

logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId => {
val logicalOp = logicalPlan.getOperator(logicalOpId)
logicalOp.setContext(context)
logicalPlan.getTopologicalOpIds.asScala.foreach(logicalOpId =>
Try {
val logicalOp = logicalPlan.getOperator(logicalOpId)
logicalOp.setContext(context)

val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId)
subPlan
.topologicalIterator()
.map(subPlan.getOperator)
.foreach({ physicalOp =>
{
val externalLinks = logicalPlan
.getUpstreamLinks(logicalOp.operatorIdentifier)
.filter(link => physicalOp.inputPorts.contains(link.toPortId))
.flatMap { link =>
physicalPlan
.getPhysicalOpsOfLogicalOp(link.fromOpId)
.find(_.outputPorts.contains(link.fromPortId))
.map(fromOp =>
PhysicalLink(fromOp.id, link.fromPortId, physicalOp.id, link.toPortId)
)
}

val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id)

// Add the operator to the physical plan
physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema())

// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks)
.foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) }
val subPlan = logicalOp.getPhysicalPlan(context.workflowId, context.executionId)
subPlan
.topologicalIterator()
.map(subPlan.getOperator)
.foreach({ physicalOp =>
{
val externalLinks = logicalPlan
.getUpstreamLinks(logicalOp.operatorIdentifier)
.filter(link => physicalOp.inputPorts.contains(link.toPortId))
.flatMap { link =>
physicalPlan
.getPhysicalOpsOfLogicalOp(link.fromOpId)
.find(_.outputPorts.contains(link.fromPortId))
.map(fromOp =>
PhysicalLink(fromOp.id, link.fromPortId, physicalOp.id, link.toPortId)
)
}

val internalLinks = subPlan.getUpstreamPhysicalLinks(physicalOp.id)

// Add the operator to the physical plan
physicalPlan = physicalPlan.addOperator(physicalOp.propagateSchema())

// Add all the links to the physical plan
physicalPlan = (externalLinks ++ internalLinks)
.foldLeft(physicalPlan) { (plan, link) => plan.addLink(link) }
}
})
} match {
case Success(_) =>
case Failure(err) =>
errorList match {
case Some(list) => list.append((logicalOpId, err))
case None => throw err
}
})
})
}
)
physicalPlan
}

Expand Down
Loading
Loading