Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Datastore to watch multiple directories #3330

Merged
merged 8 commits into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.md).
### Added

- Added support for duplicate dataset names for different organizations [#3137](https://github.com/scalableminds/webknossos/pull/3137)
- Added support to watch additional dataset directories, automatically creating symbolic links to the main directory [#3330](https://github.com/scalableminds/webknossos/pull/3330)

### Changed

Expand Down
1 change: 1 addition & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ braingames.binary {
loadTimeout = 10 # in seconds
saveTimeout = 10 # in seconds
baseFolder = binaryData
additionalFolders = []

changeHandler {
enabled = true
Expand Down
15 changes: 15 additions & 0 deletions util/src/main/scala/com/scalableminds/util/io/PathUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,19 @@ trait PathUtils extends LazyLogging {
Files.createDirectories(path)
path
}

// not following symlinks
def listDirectoriesRaw(directory: Path) = Box[List[Path]] {
try {
val directoryStream = Files.walk(directory, 1)
val r = directoryStream.iterator().asScala.toList
directoryStream.close()
Full(r)
} catch {
case ex: Exception =>
val errorMsg = s"Error: ${ex.getClass.getCanonicalName} - ${ex.getMessage}. Directory: ${directory.toAbsolutePath}"
logger.error(ex.getClass.getCanonicalName)
Failure(errorMsg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DataStoreConfig @Inject()(configuration: Configuration) extends ConfigRead
val tickerInterval = get[Int]("braingames.binary.changeHandler.tickerInterval") minutes
}
val baseFolder = get[String]("braingames.binary.baseFolder")
val additionalFolders = get[Seq[String]]("braingames.binary.additionalFolders").toList
val loadTimeout = get[Int]("braingames.binary.loadTimeout") seconds
val cacheMaxSize = get[Int]("braingames.binary.cacheMaxSize")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class DataStoreModule(environment: Environment, configuration: Configuration) ex

def configure() = {
bind(classOf[DataStoreConfig]).asEagerSingleton()
bind(classOf[BaseDirService]).asEagerSingleton()
bind(classOf[AccessTokenService]).asEagerSingleton()
bind(classOf[ActorSystem]).annotatedWith(Names.named("webknossos-datastore")).toInstance(system)
bind(classOf[BinaryDataService]).asEagerSingleton()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.scalableminds.webknossos.datastore.services

import java.io.IOException
import java.nio.file.{Files, Path, Paths}

import com.scalableminds.util.io.PathUtils
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.DataStoreConfig
import com.typesafe.scalalogging.LazyLogging
import javax.inject.Inject

import scala.concurrent.ExecutionContext

class BaseDirService @Inject()(config: DataStoreConfig)(implicit ec: ExecutionContext) extends LazyLogging with FoxImplicits {
private val baseDir = Paths.get(config.Braingames.Binary.baseFolder)
private val additionalDirs = config.Braingames.Binary.additionalFolders.map(Paths.get(_))

def updateSymlinks() : Fox[Unit] = {
for {
organizationDirectoriesNested <- additionalDirs.map(dir => PathUtils.listDirectories(dir)).toSingleBox("listDirectories failed").toFox
datasetDirectoriesNested <- organizationDirectoriesNested.flatten.map(dir => PathUtils.listDirectories(dir)).toSingleBox("listDirectories failed").toFox
_ <- createMissingSymlinks(datasetDirectoriesNested.flatten)
_ <- cleanUpDanglingSymlinks()
} yield ()
}

private def createMissingSymlinks(datasetDirectories: List[Path]) = {
Fox.serialCombined(datasetDirectories) {
directory => createSymlink(baseDir.resolve(directory.getParent.getFileName).resolve(directory.getFileName), directory)
}
}

private def createSymlink(linkLocation: Path, actualDirLocation: Path) = {
try {
if (Files.exists(linkLocation)) {
if (Files.isSymbolicLink(linkLocation) && Files.isSameFile(Files.readSymbolicLink(linkLocation), actualDirLocation)) {
// link is already in place, pass.
} else {
logger.warn(s"Could not create symlink at $linkLocation pointing to $actualDirLocation because there is already something else there.")
}
Fox.successful(())
} else {
logger.info(s"Creating symlink at $linkLocation pointing to $actualDirLocation")
Fox.successful(Files.createSymbolicLink(linkLocation.toAbsolutePath, actualDirLocation.toAbsolutePath))
}
} catch {
case e: IOException => Fox.failure(s"Failed to create symbolic link at $linkLocation ${e.getMessage}")
}
}

private def cleanUpDanglingSymlinks() = {
for {
organizationDirectories <- PathUtils.listDirectories(baseDir).toFox
dataSetDirectories <- organizationDirectories.map(dir => PathUtils.listDirectoriesRaw(dir)).toSingleBox("listDirectories failed").toFox
_ <- Fox.serialCombined(dataSetDirectories.flatten)(cleanUpIfDanglingSymlink)
} yield ()
}

private def cleanUpIfDanglingSymlink(directory: Path): Fox[Unit] = {
try {
logger.info(s"testing for symlinkness: $directory")
if (Files.isSymbolicLink(directory) && Files.notExists(Files.readSymbolicLink(directory))) {
logger.info(s"Deleting dangling symbolic link at $directory which pointed to ${Files.readSymbolicLink(directory)}")
Fox.successful(Files.delete(directory))
} else {
Fox.successful(())
}
} catch {
case e: IOException => Fox.failure(s"Failed to analyze possible symlink for cleanup at $directory: ${e.getMessage}")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DataSourceService @Inject()(
config: DataStoreConfig,
dataSourceRepository: DataSourceRepository,
val lifecycle: ApplicationLifecycle,
baseDirService: BaseDirService,
@Named("webknossos-datastore") val system: ActorSystem
) extends IntervalScheduler with LazyLogging with FoxImplicits {

Expand All @@ -43,23 +44,26 @@ class DataSourceService @Inject()(

def checkInbox(): Fox[Unit] = {
logger.info(s"Scanning inbox at: $dataBaseDir")
PathUtils.listDirectories(dataBaseDir) match {
case Full(dirs) =>
for {
_ <- Fox.successful(())
foundInboxSources = dirs.flatMap(teamAwareInboxSources)
dataSourceString = foundInboxSources.map { ds =>
s"'${ds.id.team}/${ds.id.name}' (${if (ds.isUsable) "active" else "inactive"})"
}.mkString(", ")

_ = logger.info(s"Finished scanning inbox: $dataSourceString")
_ <- dataSourceRepository.updateDataSources(foundInboxSources)
} yield ()
case e =>
val errorMsg = s"Failed to scan inbox. Error during list directories on '$dataBaseDir': $e"
logger.error(errorMsg)
Fox.failure(errorMsg)
}
for {
_ <- baseDirService.updateSymlinks ?~> "Failed to update dataset symbolic links"
_ <- PathUtils.listDirectories(dataBaseDir) match {
case Full(dirs) =>
for {
_ <- Fox.successful(())
foundInboxSources = dirs.flatMap(teamAwareInboxSources)
dataSourceString = foundInboxSources.map { ds =>
s"'${ds.id.team}/${ds.id.name}' (${if (ds.isUsable) "active" else "inactive"})"
}.mkString(", ")

_ = logger.info(s"Finished scanning inbox: $dataSourceString")
_ <- dataSourceRepository.updateDataSources(foundInboxSources)
} yield ()
case e =>
val errorMsg = s"Failed to scan inbox. Error during list directories on '$dataBaseDir': $e"
logger.error(errorMsg)
Fox.failure(errorMsg)
}
} yield ()
}

def handleUpload(id: DataSourceId, dataSetZip: File): Fox[Unit] = {
Expand Down
1 change: 1 addition & 0 deletions webknossos-datastore/conf/standalone-datastore.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ braingames.binary {
loadTimeout = 10 # in seconds
saveTimeout = 10 # in seconds
baseFolder = binaryData
additionalFolders = []

changeHandler {
enabled = true
Expand Down