Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vx followup #6732

Merged
merged 38 commits into from
Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
2e4d4d2
optimized workflow list
normanrz Dec 15, 2022
ff475a5
merge
normanrz Dec 18, 2022
02e3dba
fix
normanrz Dec 19, 2022
c402961
removed circular deps and other stuff
normanrz Dec 21, 2022
a42168e
progress
normanrz Jan 2, 2023
310a449
merge
normanrz Jan 5, 2023
12c7b1f
Merge remote-tracking branch 'origin/master' into vx-followup
normanrz Jan 5, 2023
a98a445
merge
normanrz Jan 6, 2023
1c198b8
moar sql
normanrz Jan 8, 2023
82eb63a
Merge remote-tracking branch 'origin/master' into vx-followup
normanrz Jan 9, 2023
903d23a
fixes
normanrz Jan 9, 2023
35fa8d1
fixes
normanrz Jan 10, 2023
cae1c4f
pr feedback
normanrz Jan 10, 2023
03bf232
fix
normanrz Jan 10, 2023
292502d
ui tweaks
normanrz Jan 10, 2023
ae300f3
ui tweaks
normanrz Jan 10, 2023
ff6fd0e
Merge branch 'master' into vx-followup
fm3 Jan 11, 2023
1cd8147
ui tweaks
normanrz Jan 11, 2023
d5de8fa
merge
normanrz Jan 11, 2023
0f7c4e5
changelog
normanrz Jan 11, 2023
b9a50ce
ui tweaks
normanrz Jan 12, 2023
3e03450
merge
normanrz Jan 12, 2023
fcc48a3
Merge remote-tracking branch 'origin/master' into vx-followup
normanrz Jan 12, 2023
b8d0949
restructures vx tables
normanrz Jan 12, 2023
7328722
fixes
normanrz Jan 12, 2023
3109f84
fix schema
normanrz Jan 13, 2023
31113df
fixes
normanrz Jan 13, 2023
8a47397
merge
normanrz Jan 16, 2023
8611dc1
faster findTaskRuns query
normanrz Jan 16, 2023
13910b5
merge
normanrz Jan 17, 2023
d016d08
merge
normanrz Jan 17, 2023
4e4f506
new evolution num
normanrz Jan 17, 2023
34d5d63
pretty
normanrz Jan 17, 2023
5e4f130
evolutions
normanrz Jan 17, 2023
ff51ad5
evolutions
normanrz Jan 17, 2023
31fc651
schema
normanrz Jan 17, 2023
508f8e2
Merge branch 'master' into vx-followup
normanrz Jan 17, 2023
5bd6198
Merge branch 'master' into vx-followup
normanrz Jan 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 69 additions & 79 deletions app/controllers/VoxelyticsController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import utils.{ObjectId, WkConf}

import javax.inject.Inject
import scala.concurrent.ExecutionContext
import scala.util.Try

class VoxelyticsController @Inject()(
organizationDAO: OrganizationDAO,
Expand Down Expand Up @@ -56,12 +57,12 @@ class VoxelyticsController @Inject()(
} yield Ok
}

def listWorkflows(workflowHash: Option[String]): Action[AnyContent] =
def listWorkflows: Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
// Auth is implemented in `voxelyticsDAO.findRuns`
runs <- voxelyticsDAO.findRuns(request.identity, None, workflowHash, conf.staleTimeout, allowUnlisted = false)
// Auth is implemented in `voxelyticsDAO.findRunsForWorkflowListing`
runs <- voxelyticsDAO.findRunsForWorkflowListing(request.identity, conf.staleTimeout)
result <- if (runs.nonEmpty) {
listWorkflowsWithRuns(request, runs)
} else {
Expand All @@ -70,30 +71,33 @@ class VoxelyticsController @Inject()(
} yield JsonOk(result)
}

private def listWorkflowsWithRuns(request: SecuredRequest[WkEnv, AnyContent], runs: List[RunEntry]): Fox[JsArray] =
private def listWorkflowsWithRuns(request: SecuredRequest[WkEnv, AnyContent],
runs: List[WorkflowListingRunEntry]): Fox[JsArray] =
for {
_ <- bool2Fox(runs.nonEmpty) // just asserting once more
taskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization, runs.map(_.id), conf.staleTimeout)
_ <- bool2Fox(taskRuns.nonEmpty) ?~> "voxelytics.noTaskFound" ~> NOT_FOUND
workflowTaskStatistics <- voxelyticsDAO.findWorkflowTaskStatistics(request.identity,
runs.map(_.workflow_hash).toSet,
conf.staleTimeout)
_ <- bool2Fox(workflowTaskStatistics.nonEmpty) ?~> "voxelytics.noTaskFound" ~> NOT_FOUND
workflows <- voxelyticsDAO.findWorkflowsByHashAndOrganization(request.identity._organization,
runs.map(_.workflow_hash).toSet)
_ <- bool2Fox(workflows.nonEmpty) ?~> "voxelytics.noWorkflowFound" ~> NOT_FOUND

workflowsAsJson = JsArray(workflows.flatMap(workflow => {
val workflowRuns = runs.filter(run => run.workflow_hash == workflow.hash)
if (workflowRuns.nonEmpty) {
val state, beginTime, endTime = voxelyticsService.aggregateBeginEndTime(workflowRuns)
val state = workflowRuns.maxBy(_.beginTime).state
val beginTime = workflowRuns.map(_.beginTime).min
val endTime = Try(workflowRuns.flatMap(_.endTime).max).toOption
Some(
Json.obj(
"name" -> workflow.name,
"hash" -> workflow.hash,
"beginTime" -> beginTime,
"endTime" -> endTime,
"state" -> state.toString(),
"runs" -> workflowRuns.map(run => {
val tasks = taskRuns.filter(taskRun => taskRun.runId == run.id)
voxelyticsService.runPublicWrites(run, tasks)
})
"taskStatistics" -> workflowTaskStatistics.get(workflow.hash),
"runs" -> workflowRuns
))
} else {
None
Expand Down Expand Up @@ -128,26 +132,21 @@ class VoxelyticsController @Inject()(
mostRecentRun <- sortedRuns.headOption ?~> "voxelytics.zeroRunWorkflow"

// Fetch task runs for all runs
allTaskRuns <- voxelyticsDAO.findTaskRuns(request.identity._organization,
sortedRuns.map(_.id),
conf.staleTimeout)
allTaskRuns <- voxelyticsDAO.findTaskRuns(sortedRuns.map(_.id), conf.staleTimeout)
combinedTaskRuns <- voxelyticsDAO.findCombinedTaskRuns(sortedRuns.map(_.id), conf.staleTimeout)

// Select one representative "task run" for each task
// This will be the most recent run that is running or finished or the most recent run
combinedTaskRuns = voxelyticsService.combineTaskRuns(allTaskRuns, mostRecentRun.id)
// Fetch artifact data for task runs
artifacts <- voxelyticsDAO.findArtifacts(sortedRuns.map(_.id), conf.staleTimeout)

// Fetch artifact data for selected/combined task runs
artifacts <- voxelyticsDAO.findArtifacts(combinedTaskRuns.map(_.taskId))
tasks <- voxelyticsDAO.findTasks(combinedTaskRuns)
// Fetch task configs
tasks <- voxelyticsDAO.findTasks(mostRecentRun.id)

// Assemble workflow report JSON
(state, beginTime, endTime) = voxelyticsService.aggregateBeginEndTime(runs)
result = Json.obj(
"config" -> voxelyticsService.workflowConfigPublicWrites(mostRecentRun.workflow_config, tasks),
"artifacts" -> voxelyticsService.artifactsPublicWrites(artifacts),
"run" -> voxelyticsService.runPublicWrites(
mostRecentRun.copy(state = state, beginTime = beginTime, endTime = endTime),
combinedTaskRuns),
"runs" -> sortedRuns,
"tasks" -> voxelyticsService.taskRunsPublicWrites(combinedTaskRuns, allTaskRuns),
"workflow" -> Json.obj(
"name" -> workflow.name,
"hash" -> workflowHash,
Expand All @@ -159,87 +158,78 @@ class VoxelyticsController @Inject()(

def storeWorkflowEvents(workflowHash: String, runName: String): Action[List[WorkflowEvent]] =
sil.SecuredAction.async(validateJson[List[WorkflowEvent]]) { implicit request =>
def createWorkflowEvent(runId: ObjectId, event: WorkflowEvent): Fox[Unit] =
event match {
case ev: RunStateChangeEvent => voxelyticsDAO.upsertRunStateChangeEvent(runId, ev)
def createWorkflowEvent(runId: ObjectId, events: List[WorkflowEvent]): Fox[Unit] =
if (events.nonEmpty) {
events.head match {
case _: RunStateChangeEvent =>
voxelyticsDAO.upsertRunStateChangeEvents(runId, events.map(_.asInstanceOf[RunStateChangeEvent]))

case ev: TaskStateChangeEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
_ <- voxelyticsDAO.upsertTaskStateChangeEvent(taskId, ev)
_ <- Fox.combined(
ev.artifacts
.map(artifactKV => {
val artifactName = artifactKV._1
val artifact = artifactKV._2
voxelyticsDAO.upsertArtifact(taskId,
artifactName,
artifact.path,
artifact.file_size,
artifact.inode_count,
artifact.version,
artifact.metadataAsJson)
})
.toList)
} yield ()
case _: TaskStateChangeEvent =>
val taskEvents = events.map(_.asInstanceOf[TaskStateChangeEvent])
val artifactEvents =
taskEvents.flatMap(ev => ev.artifacts.map(artifact => (ev.taskName, artifact._1, artifact._2)))
for {
_ <- voxelyticsDAO.upsertTaskStateChangeEvents(runId, taskEvents)
_ <- if (artifactEvents.nonEmpty) { voxelyticsDAO.upsertArtifacts(runId, artifactEvents) } else {
Fox.successful(())
}
} yield ()

case ev: ChunkStateChangeEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
chunkId <- voxelyticsDAO.upsertChunk(taskId, ev.executionId, ev.chunkName)
_ <- voxelyticsDAO.upsertChunkStateChangeEvent(chunkId, ev)
} yield ()
case _: ChunkStateChangeEvent =>
voxelyticsDAO.upsertChunkStateChangeEvents(runId, events.map(_.asInstanceOf[ChunkStateChangeEvent]))

case ev: RunHeartbeatEvent => voxelyticsDAO.upsertRunHeartbeatEvent(runId, ev)
case _: RunHeartbeatEvent =>
voxelyticsDAO.upsertRunHeartbeatEvents(runId, events.map(_.asInstanceOf[RunHeartbeatEvent]))

case ev: ChunkProfilingEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
chunkId <- voxelyticsDAO.getChunkIdByName(taskId, ev.executionId, ev.chunkName)
_ <- voxelyticsDAO.upsertChunkProfilingEvent(chunkId, ev)
} yield ()
case _: ChunkProfilingEvent =>
voxelyticsDAO.upsertChunkProfilingEvents(runId, events.map(_.asInstanceOf[ChunkProfilingEvent]))

case ev: ArtifactFileChecksumEvent =>
for {
taskId <- voxelyticsDAO.getTaskIdByName(ev.taskName, runId)
artifactId <- voxelyticsDAO.getArtifactIdByName(taskId, ev.artifactName)
_ <- voxelyticsDAO.upsertArtifactChecksumEvent(artifactId, ev)
} yield ()
}
case _: ArtifactFileChecksumEvent =>
voxelyticsDAO.upsertArtifactChecksumEvents(runId, events.map(_.asInstanceOf[ArtifactFileChecksumEvent]))
}
} else { Fox.successful(()) }

val groupedEvents = request.body.groupBy(_.getClass)

for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runId <- voxelyticsDAO.getRunIdByName(runName, request.identity._organization) ?~> "voxelytics.runNotFound" ~> NOT_FOUND
_ <- voxelyticsService.checkAuth(runId, request.identity) ~> UNAUTHORIZED
_ <- Fox.serialCombined(request.body)(event => createWorkflowEvent(runId, event))
// Also checks authorization
runId <- voxelyticsDAO.getRunIdByNameAndWorkflowHash(runName, workflowHash, request.identity) ?~> "voxelytics.runNotFound" ~> NOT_FOUND
_ <- Fox.serialCombined(groupedEvents.values.toList)(eventGroup => createWorkflowEvent(runId, eventGroup)) ~> INTERNAL_SERVER_ERROR
} yield Ok
}

def getChunkStatistics(workflowHash: String, runId: String, taskName: String): Action[AnyContent] =
def getChunkStatistics(workflowHash: String, runIdOpt: Option[String], taskName: String): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runIdValidated <- ObjectId.fromString(runId)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "voxelytics.taskNotFound" ~> NOT_FOUND
results <- voxelyticsDAO.getChunkStatistics(taskId)
runIdOptValidated <- Fox.runOptional(runIdOpt)(ObjectId.fromString)
runs <- voxelyticsDAO.findRuns(request.identity,
runIdOptValidated.map(List(_)),
Some(workflowHash),
conf.staleTimeout,
allowUnlisted = true)
results <- voxelyticsDAO.getChunkStatistics(runs.map(_.id), taskName, conf.staleTimeout)
} yield JsonOk(Json.toJson(results))
}
}

def getArtifactChecksums(workflowHash: String,
runId: String,
runIdOpt: Option[String],
taskName: String,
artifactName: Option[String]): Action[AnyContent] =
sil.SecuredAction.async { implicit request =>
{
for {
_ <- bool2Fox(wkConf.Features.voxelyticsEnabled) ?~> "voxelytics.disabled"
runIdValidated <- ObjectId.fromString(runId)
_ <- voxelyticsService.checkAuth(runIdValidated, request.identity) ~> UNAUTHORIZED
taskId <- voxelyticsDAO.getTaskIdByName(taskName, runIdValidated) ?~> "voxelytics.taskNotFound" ~> NOT_FOUND
results <- voxelyticsDAO.getArtifactChecksums(taskId, artifactName)
runIdOptValidated <- Fox.runOptional(runIdOpt)(ObjectId.fromString)
runs <- voxelyticsDAO.findRuns(request.identity,
runIdOptValidated.map(List(_)),
Some(workflowHash),
conf.staleTimeout,
allowUnlisted = true)
results <- voxelyticsDAO.getArtifactChecksums(runs.map(_.id), taskName, artifactName, conf.staleTimeout)
} yield JsonOk(Json.toJson(results))
}
}
Expand Down
10 changes: 2 additions & 8 deletions app/models/binary/explore/N5MultiscalesExplorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,7 @@ import com.scalableminds.webknossos.datastore.dataformats.MagLocator
import com.scalableminds.webknossos.datastore.dataformats.n5.{N5DataLayer, N5Layer, N5SegmentationLayer}
import com.scalableminds.webknossos.datastore.dataformats.zarr.FileSystemCredentials
import com.scalableminds.webknossos.datastore.datareaders.AxisOrder
import com.scalableminds.webknossos.datastore.datareaders.n5.{
N5Header,
N5Metadata,
N5MultiscalesDataset,
N5MultiscalesItem,
N5Transform
}
import com.scalableminds.webknossos.datastore.datareaders.n5._
import com.scalableminds.webknossos.datastore.models.datasource.Category
import net.liftweb.util.Helpers.tryo

Expand All @@ -26,7 +20,7 @@ class N5MultiscalesExplorer extends RemoteLayerExplorer with FoxImplicits {
override def explore(remotePath: Path, credentials: Option[FileSystemCredentials]): Fox[List[(N5Layer, Vec3Double)]] =
for {
zattrsPath <- Fox.successful(remotePath.resolve(N5Metadata.FILENAME_ATTRIBUTES_JSON))
n5Metadata <- parseJsonFromPath[N5Metadata](zattrsPath) ?~> s"Failed to read OME NGFF header at $zattrsPath"
n5Metadata <- parseJsonFromPath[N5Metadata](zattrsPath) ?~> s"Failed to read N5 header at $zattrsPath"
layers <- Fox.serialCombined(n5Metadata.multiscales)(layerFromN5MultiscalesItem(_, remotePath, credentials))
} yield layers

Expand Down
Loading