Skip to content

Commit

Permalink
restore the backend deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
bobbai00 committed Sep 20, 2024
1 parent 6bf5f47 commit c2457fa
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class TexeraWebApplication

environment.jersey.register(classOf[SystemMetadataResource])
// environment.jersey().register(classOf[MockKillWorkerResource])
environment.jersey.register(classOf[SchemaPropagationResource])

if (AmberConfig.isUserSystemEnabled) {
// register JWT Auth layer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edu.uci.ics.texera.web.model.websocket.request

import edu.uci.ics.texera.workflow.common.operators.LogicalOp
import edu.uci.ics.texera.workflow.common.workflow.LogicalLink

case class EditingTimeCompilationRequest(
operators: List[LogicalOp],
links: List[LogicalLink],
opsToViewResult: List[String],
opsToReuseResult: List[String]
) extends TexeraWebSocketRequest {

def toLogicalPlanPojo: LogicalPlanPojo = {
LogicalPlanPojo(operators, links, opsToViewResult, opsToReuseResult)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import edu.uci.ics.texera.web.model.websocket.request.python.{
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(
Array(
new Type(value = classOf[EditingTimeCompilationRequest]),
new Type(value = classOf[HeartBeatRequest]),
new Type(value = classOf[ModifyLogicRequest]),
new Type(value = classOf[ResultExportRequest]),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package edu.uci.ics.texera.web.resource
import com.typesafe.scalalogging.LazyLogging
import edu.uci.ics.amber.engine.common.virtualidentity.WorkflowIdentity
import edu.uci.ics.texera.Utils
import edu.uci.ics.texera.web.auth.SessionUser
import edu.uci.ics.texera.web.model.http.response.SchemaPropagationResponse
import edu.uci.ics.texera.web.model.websocket.request.LogicalPlanPojo
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.workflow.{LogicalPlan, PhysicalPlan}
import io.dropwizard.auth.Auth
import org.jooq.types.UInteger

import javax.annotation.security.RolesAllowed
import javax.ws.rs._
import javax.ws.rs.core.MediaType

/**
* The SchemaPropagation functionality will be included by the standalone compiling service
*/
@Deprecated
@Consumes(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON))
@Path("/queryplan")
class SchemaPropagationResource extends LazyLogging {

@Deprecated
@POST
@Path("/autocomplete/{wid}")
@RolesAllowed(Array("REGULAR", "ADMIN"))
def suggestAutocompleteSchema(
workflowStr: String,
@PathParam("wid") wid: UInteger,
@Auth sessionUser: SessionUser
): SchemaPropagationResponse = {

val logicalPlanPojo = Utils.objectMapper.readValue(workflowStr, classOf[LogicalPlanPojo])

val context = new WorkflowContext(
workflowId = WorkflowIdentity(wid.toString.toLong)
)

val logicalPlan = LogicalPlan(logicalPlanPojo)

// the PhysicalPlan with topology expanded.
val physicalPlan = PhysicalPlan(context, logicalPlan, None)

// Extract physical input schemas, excluding internal ports
val physicalInputSchemas = physicalPlan.operators.map { physicalOp =>
physicalOp.id -> physicalOp.inputPorts.values
.filterNot(_._1.id.internal)
.map {
case (port, _, schema) => port.id -> schema.toOption
}
}

// Group the physical input schemas by their logical operator ID and consolidate the schemas
val logicalInputSchemas = physicalInputSchemas
.groupBy(_._1.logicalOpId)
.view
.mapValues(_.flatMap(_._2).toList.sortBy(_._1.id).map(_._2))
.toMap

// Prepare the response content by extracting attributes from the schemas,
// ignoring errors (errors are reported through EditingTimeCompilationRequest)
val responseContent = logicalInputSchemas.map {
case (logicalOpId, schemas) =>
logicalOpId.id -> schemas.map(_.map(_.getAttributes))
}
SchemaPropagationResponse(0, responseContent, null)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@ import edu.uci.ics.amber.engine.common.virtualidentity.WorkflowIdentity
import edu.uci.ics.amber.error.ErrorUtils.getStackTraceWithAllCauses
import edu.uci.ics.texera.Utils.objectMapper
import edu.uci.ics.texera.web.model.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.model.websocket.event.{WorkflowErrorEvent, WorkflowStateEvent}
import edu.uci.ics.texera.web.model.websocket.event.{
CacheStatusUpdateEvent,
WorkflowErrorEvent,
WorkflowStateEvent
}
import edu.uci.ics.texera.web.model.websocket.request._
import edu.uci.ics.texera.web.model.websocket.response._
import edu.uci.ics.texera.web.service.WorkflowService
import edu.uci.ics.texera.web.service.{WorkflowCacheChecker, WorkflowService}
import edu.uci.ics.texera.web.storage.ExecutionStateStore
import edu.uci.ics.texera.web.workflowruntimestate.FatalErrorType.COMPILATION_ERROR
import edu.uci.ics.texera.web.workflowruntimestate.WorkflowAggregatedState.{PAUSED, RUNNING}
import edu.uci.ics.texera.web.workflowruntimestate.WorkflowFatalError
import edu.uci.ics.texera.web.{ServletAwareConfigurator, SessionState}
import edu.uci.ics.texera.workflow.common.WorkflowContext
import edu.uci.ics.texera.workflow.common.workflow.WorkflowCompiler

import java.time.Instant
import javax.websocket._
Expand Down Expand Up @@ -82,6 +90,42 @@ class WorkflowWebsocketResource extends LazyLogging {
)
sessionState.send(modifyLogicResponse)
}
case editingTimeCompilationRequest: EditingTimeCompilationRequest =>
// TODO: remove this after separating the workflow compiler as a standalone service
val stateStore = if (executionStateOpt.isDefined) {
val currentState =
executionStateOpt.get.executionStateStore.metadataStore.getState.state
if (currentState == RUNNING || currentState == PAUSED) {
// disable check if the workflow execution is active.
return
}
executionStateOpt.get.executionStateStore
} else {
new ExecutionStateStore()
}
val workflowContext = new WorkflowContext(
sessionState.getCurrentWorkflowState.get.workflowId
)
try {
val workflowCompiler =
new WorkflowCompiler(workflowContext)
val newPlan = workflowCompiler.compileLogicalPlan(
editingTimeCompilationRequest.toLogicalPlanPojo,
stateStore
)
val validateResult = WorkflowCacheChecker.handleCacheStatusUpdate(
workflowStateOpt.get.lastCompletedLogicalPlan,
newPlan,
editingTimeCompilationRequest
)
sessionState.send(CacheStatusUpdateEvent(validateResult))
} catch {
case t: Throwable => // skip, rethrow this exception will overwrite the compilation errors reported below.
} finally {
if (stateStore.metadataStore.getState.fatalErrors.nonEmpty) {
sessionState.send(WorkflowErrorEvent(stateStore.metadataStore.getState.fatalErrors))
}
}
case workflowExecuteRequest: WorkflowExecuteRequest =>
workflowStateOpt match {
case Some(workflow) => workflow.initExecutionService(workflowExecuteRequest, uidOpt)
Expand Down Expand Up @@ -121,4 +165,4 @@ class WorkflowWebsocketResource extends LazyLogging {

}

}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,28 @@
package edu.uci.ics.texera.web.service

import edu.uci.ics.amber.engine.common.virtualidentity.OperatorIdentity
import edu.uci.ics.texera.web.model.websocket.request.EditingTimeCompilationRequest
import edu.uci.ics.texera.workflow.common.workflow.LogicalPlan

import scala.collection.mutable

object WorkflowCacheChecker {

def handleCacheStatusUpdate(
oldPlan: Option[LogicalPlan],
newPlan: LogicalPlan,
request: EditingTimeCompilationRequest
): Map[String, String] = {
val validCacheOps = new WorkflowCacheChecker(oldPlan, newPlan).getValidCacheReuse
val cacheUpdateResult = request.opsToReuseResult
.map(idString => OperatorIdentity(idString))
.map(opId => (opId.id, if (validCacheOps.contains(opId)) "cache valid" else "cache invalid"))
.toMap
cacheUpdateResult
}

}

class WorkflowCacheChecker(oldWorkflowOpt: Option[LogicalPlan], newWorkflow: LogicalPlan) {

private val equivalenceClass = new mutable.HashMap[String, Int]()
Expand Down Expand Up @@ -84,4 +102,4 @@ class WorkflowCacheChecker(oldWorkflowOpt: Option[LogicalPlan], newWorkflow: Log
.toSet
}

}
}

0 comments on commit c2457fa

Please sign in to comment.