Skip to content

Commit

Permalink
Enable Datastore to watch multiple directories (#3330)
Browse files Browse the repository at this point in the history
* enable datastore to watch multiple directories

* [WIP] watch additional directories

* update symbolic links when scanning inbox

* update changelog

* clean up dangling symlinks
  • Loading branch information
fm3 authored Oct 11, 2018
1 parent 59c4b9a commit 1141949
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.md).
### Added

- Added support for duplicate dataset names for different organizations [#3137](https://github.com/scalableminds/webknossos/pull/3137)
- Added support to watch additional dataset directories, automatically creating symbolic links to the main directory [#3330](https://github.com/scalableminds/webknossos/pull/3330)
- A User can now have multiple layouts for tracing views. [#3299](https://github.com/scalableminds/webknossos/pull/3299)

### Changed
Expand Down
1 change: 1 addition & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ braingames.binary {
loadTimeout = 10 # in seconds
saveTimeout = 10 # in seconds
baseFolder = binaryData
additionalFolders = []

changeHandler {
enabled = true
Expand Down
15 changes: 15 additions & 0 deletions util/src/main/scala/com/scalableminds/util/io/PathUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,19 @@ trait PathUtils extends LazyLogging {
Files.createDirectories(path)
path
}

// not following symlinks
def listDirectoriesRaw(directory: Path) = Box[List[Path]] {
try {
val directoryStream = Files.walk(directory, 1)
val r = directoryStream.iterator().asScala.toList
directoryStream.close()
Full(r)
} catch {
case ex: Exception =>
val errorMsg = s"Error: ${ex.getClass.getCanonicalName} - ${ex.getMessage}. Directory: ${directory.toAbsolutePath}"
logger.error(ex.getClass.getCanonicalName)
Failure(errorMsg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DataStoreConfig @Inject()(configuration: Configuration) extends ConfigRead
val tickerInterval = get[Int]("braingames.binary.changeHandler.tickerInterval") minutes
}
val baseFolder = get[String]("braingames.binary.baseFolder")
val additionalFolders = get[Seq[String]]("braingames.binary.additionalFolders").toList
val loadTimeout = get[Int]("braingames.binary.loadTimeout") seconds
val cacheMaxSize = get[Int]("braingames.binary.cacheMaxSize")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DataStoreModule(environment: Environment, configuration: Configuration) ex

def configure() = {
bind(classOf[DataStoreConfig]).asEagerSingleton()
bind(classOf[BaseDirService]).asEagerSingleton()
bind(classOf[AccessTokenService]).asEagerSingleton()
bind(classOf[ActorSystem]).annotatedWith(Names.named("webknossos-datastore")).toInstance(system)
bind(classOf[BinaryDataService]).asEagerSingleton()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.scalableminds.webknossos.datastore.services

import java.io.IOException
import java.nio.file.{Files, Path, Paths}

import com.scalableminds.util.io.PathUtils
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject

import scala.concurrent.ExecutionContext

class BaseDirService @Inject()(config: DataStoreConfig)(implicit ec: ExecutionContext) extends LazyLogging with FoxImplicits {
private val baseDir = Paths.get(config.Braingames.Binary.baseFolder)
private val additionalDirs = config.Braingames.Binary.additionalFolders.map(Paths.get(_))

def updateSymlinks() : Fox[Unit] = {
for {
organizationDirectoriesNested <- additionalDirs.map(dir => PathUtils.listDirectories(dir)).toSingleBox("listDirectories failed").toFox
datasetDirectoriesNested <- organizationDirectoriesNested.flatten.map(dir => PathUtils.listDirectories(dir)).toSingleBox("listDirectories failed").toFox
_ <- createMissingSymlinks(datasetDirectoriesNested.flatten)
_ <- cleanUpDanglingSymlinks()
} yield ()
}

private def createMissingSymlinks(datasetDirectories: List[Path]) = {
Fox.serialCombined(datasetDirectories) {
directory => createSymlink(baseDir.resolve(directory.getParent.getFileName).resolve(directory.getFileName), directory)
}
}

private def createSymlink(linkLocation: Path, actualDirLocation: Path) = {
try {
if (Files.exists(linkLocation)) {
if (Files.isSymbolicLink(linkLocation) && Files.isSameFile(Files.readSymbolicLink(linkLocation), actualDirLocation)) {
// link is already in place, pass.
} else {
logger.warn(s"Could not create symlink at $linkLocation pointing to $actualDirLocation because there is already something else there.")
}
Fox.successful(())
} else {
logger.info(s"Creating symlink at $linkLocation pointing to $actualDirLocation")
Fox.successful(Files.createSymbolicLink(linkLocation.toAbsolutePath, actualDirLocation.toAbsolutePath))
}
} catch {
case e: IOException => Fox.failure(s"Failed to create symbolic link at $linkLocation ${e.getMessage}")
}
}

private def cleanUpDanglingSymlinks() = {
for {
organizationDirectories <- PathUtils.listDirectories(baseDir).toFox
dataSetDirectories <- organizationDirectories.map(dir => PathUtils.listDirectoriesRaw(dir)).toSingleBox("listDirectories failed").toFox
_ <- Fox.serialCombined(dataSetDirectories.flatten)(cleanUpIfDanglingSymlink)
} yield ()
}

private def cleanUpIfDanglingSymlink(directory: Path): Fox[Unit] = {
try {
logger.info(s"testing for symlinkness: $directory")
if (Files.isSymbolicLink(directory) && Files.notExists(Files.readSymbolicLink(directory))) {
logger.info(s"Deleting dangling symbolic link at $directory which pointed to ${Files.readSymbolicLink(directory)}")
Fox.successful(Files.delete(directory))
} else {
Fox.successful(())
}
} catch {
case e: IOException => Fox.failure(s"Failed to analyze possible symlink for cleanup at $directory: ${e.getMessage}")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DataSourceService @Inject()(
config: DataStoreConfig,
dataSourceRepository: DataSourceRepository,
val lifecycle: ApplicationLifecycle,
baseDirService: BaseDirService,
@Named("webknossos-datastore") val system: ActorSystem
) extends IntervalScheduler with LazyLogging with FoxImplicits {

Expand All @@ -43,23 +44,26 @@ class DataSourceService @Inject()(

def checkInbox(): Fox[Unit] = {
logger.info(s"Scanning inbox at: $dataBaseDir")
PathUtils.listDirectories(dataBaseDir) match {
case Full(dirs) =>
for {
_ <- Fox.successful(())
foundInboxSources = dirs.flatMap(teamAwareInboxSources)
dataSourceString = foundInboxSources.map { ds =>
s"'${ds.id.team}/${ds.id.name}' (${if (ds.isUsable) "active" else "inactive"})"
}.mkString(", ")

_ = logger.info(s"Finished scanning inbox: $dataSourceString")
_ <- dataSourceRepository.updateDataSources(foundInboxSources)
} yield ()
case e =>
val errorMsg = s"Failed to scan inbox. Error during list directories on '$dataBaseDir': $e"
logger.error(errorMsg)
Fox.failure(errorMsg)
}
for {
_ <- baseDirService.updateSymlinks ?~> "Failed to update dataset symbolic links"
_ <- PathUtils.listDirectories(dataBaseDir) match {
case Full(dirs) =>
for {
_ <- Fox.successful(())
foundInboxSources = dirs.flatMap(teamAwareInboxSources)
dataSourceString = foundInboxSources.map { ds =>
s"'${ds.id.team}/${ds.id.name}' (${if (ds.isUsable) "active" else "inactive"})"
}.mkString(", ")

_ = logger.info(s"Finished scanning inbox: $dataSourceString")
_ <- dataSourceRepository.updateDataSources(foundInboxSources)
} yield ()
case e =>
val errorMsg = s"Failed to scan inbox. Error during list directories on '$dataBaseDir': $e"
logger.error(errorMsg)
Fox.failure(errorMsg)
}
} yield ()
}

def handleUpload(id: DataSourceId, dataSetZip: File): Fox[Unit] = {
Expand Down
1 change: 1 addition & 0 deletions webknossos-datastore/conf/standalone-datastore.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ braingames.binary {
loadTimeout = 10 # in seconds
saveTimeout = 10 # in seconds
baseFolder = binaryData
additionalFolders = []

changeHandler {
enabled = true
Expand Down

0 comments on commit 1141949

Please sign in to comment.