Skip to content

Commit

Permalink
Asynchronous cache for zarr chunk loading (#6165)
Browse files Browse the repository at this point in the history
* [WIP] Asynchronous cache for zarr chunk loading

* re-add exception handling

* clean up
  • Loading branch information
fm3 authored Apr 26, 2022
1 parent 6bbafb6 commit d42694a
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
- Add filter functionality to "Access Permissions" column to filter for public datasets.
- Removed `isActive` and `isPublic` columns to save screen space.
- Changed data layer entries to display layer names instead of categories, e.g. "color" --> "axons".
- Sped up initial requests for remote zarr dataset by using asynchronous caching. [#6165](https://github.com/scalableminds/webknossos/pull/6165)

### Fixed
- Fixed a bug that led to an error when drag-'n-dropping an empty volume annotation in the dataset view. [#6116](https://github.com/scalableminds/webknossos/pull/6116)
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ object Dependencies {
private val akkaLogging = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
private val akkaTest = "com.typesafe.akka" %% "akka-testkit" % akkaVersion
private val akkaHttp = "com.typesafe.akka" %% "akka-http-core" % akkaHttpVersion
private val akkaCaching = "com.typesafe.akka" %% "akka-http-caching" % akkaHttpVersion
private val commonsCodec = "commons-codec" % "commons-codec" % "1.10"
private val commonsEmail = "org.apache.commons" % "commons-email" % "1.5"
private val commonsIo = "commons-io" % "commons-io" % "2.9.0"
Expand Down Expand Up @@ -76,6 +77,7 @@ object Dependencies {
grpcServices,
scalapbRuntimeGrpc,
akkaLogging,
akkaCaching,
ehcache,
gson,
webknossosWrap,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.scalableminds.webknossos.datastore.dataformats

import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.models.BucketPosition
import net.liftweb.common.Box

import scala.concurrent.ExecutionContext

// To be implemented as handle for a cube (e.g. may correspond to one 1GB wkw file)
trait DataCubeHandle extends SafeCachable {
def cutOutBucket(bucket: BucketPosition): Box[Array[Byte]]
def cutOutBucket(bucket: BucketPosition)(implicit ec: ExecutionContext): Fox[Array[Byte]]
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package com.scalableminds.webknossos.datastore.dataformats.wkw

import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle}
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.wrap.WKWFile
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Box, Empty}

class WKWCubeHandle(wkwFile: WKWFile) extends DataCubeHandle {
import scala.concurrent.{ExecutionContext, Future}

def cutOutBucket(bucket: BucketPosition): Box[Array[Byte]] = {
class WKWCubeHandle(wkwFile: WKWFile) extends DataCubeHandle with FoxImplicits {

def cutOutBucket(bucket: BucketPosition)(implicit ec: ExecutionContext): Fox[Array[Byte]] = {
val numBlocksPerCubeDimension = wkwFile.header.numBlocksPerCubeDimension
val blockOffsetX = bucket.x % numBlocksPerCubeDimension
val blockOffsetY = bucket.y % numBlocksPerCubeDimension
val blockOffsetZ = bucket.z % numBlocksPerCubeDimension
wkwFile.readBlock(blockOffsetX, blockOffsetY, blockOffsetZ)
Fox(Future.successful(wkwFile.readBlock(blockOffsetX, blockOffsetY, blockOffsetZ)))
}

override protected def onFinalize(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,26 @@ import java.nio.file.{FileSystem, Path}

import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.requestlogging.RateLimitedErrorLogging
import com.scalableminds.util.tools.Fox
import com.scalableminds.webknossos.datastore.dataformats.{BucketProvider, DataCubeHandle}
import com.scalableminds.webknossos.datastore.jzarr.ZarrArray
import com.scalableminds.webknossos.datastore.models.BucketPosition
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.scalableminds.webknossos.datastore.storage.FileSystemsHolder
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.Box.tryo
import net.liftweb.common.{Box, Empty}
import net.liftweb.common.{Box, Empty, Failure, Full}

import scala.concurrent.ExecutionContext

class ZarrCubeHandle(zarrArray: ZarrArray) extends DataCubeHandle with LazyLogging with RateLimitedErrorLogging {

def cutOutBucket(bucket: BucketPosition): Box[Array[Byte]] = {
def cutOutBucket(bucket: BucketPosition)(implicit ec: ExecutionContext): Fox[Array[Byte]] = {
val shape = Vec3Int.full(bucket.bucketLength)
val offset = Vec3Int(bucket.globalXInMag, bucket.globalYInMag, bucket.globalZInMag)
tryo(onError = e => logError(e))(zarrArray.readBytesXYZ(shape, offset))
zarrArray.readBytesXYZ(shape, offset).recover {
case t: Throwable => logError(t); return Failure(t.getMessage, Full(t), Empty)
}
}

override protected def onFinalize(): Unit = ()
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package com.scalableminds.webknossos.datastore.jzarr

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, IOException}

import com.typesafe.scalalogging.LazyLogging
import javax.imageio.stream.MemoryCacheImageInputStream
import ucar.ma2.{Array => MultiArray, DataType => MADataType}

import scala.concurrent.Future
import scala.util.Using

object ChunkReader {
Expand All @@ -25,7 +27,7 @@ trait ChunkReader {
lazy val chunkSize: Int = header.chunks.toList.product

@throws[IOException]
def read(path: String): MultiArray
def read(path: String): Future[MultiArray]

protected def readBytes(path: String): Option[Array[Byte]] =
Using.Manager { use =>
Expand All @@ -43,18 +45,18 @@ trait ChunkReader {
class ByteChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader {
val ma2DataType: MADataType = MADataType.BYTE

override def read(path: String): MultiArray =
readBytes(path).map { bytes =>
override def read(path: String): Future[MultiArray] =
Future.successful(readBytes(path).map { bytes =>
MultiArray.factory(ma2DataType, header.chunkShapeOrdered, bytes)
}.getOrElse(createFilled(ma2DataType))
}.getOrElse(createFilled(ma2DataType)))
}

class DoubleChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader {

val ma2DataType: MADataType = MADataType.DOUBLE

override def read(path: String): MultiArray =
Using.Manager { use =>
override def read(path: String): Future[MultiArray] =
Future.successful(Using.Manager { use =>
readBytes(path).map { bytes =>
val typedStorage = new Array[Double](chunkSize)
val bais = use(new ByteArrayInputStream(bytes))
Expand All @@ -63,15 +65,15 @@ class DoubleChunkReader(val store: Store, val header: ZarrHeader) extends ChunkR
iis.readFully(typedStorage, 0, typedStorage.length)
MultiArray.factory(ma2DataType, header.chunkShapeOrdered, typedStorage)
}.getOrElse(createFilled(ma2DataType))
}.get
}.get)
}

class ShortChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader {
class ShortChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader with LazyLogging {

val ma2DataType: MADataType = MADataType.SHORT

override def read(path: String): MultiArray =
Using.Manager { use =>
override def read(path: String): Future[MultiArray] =
Future.successful(Using.Manager { use =>
readBytes(path).map { bytes =>
val typedStorage = new Array[Short](chunkSize)
val bais = use(new ByteArrayInputStream(bytes))
Expand All @@ -80,15 +82,15 @@ class ShortChunkReader(val store: Store, val header: ZarrHeader) extends ChunkRe
iis.readFully(typedStorage, 0, typedStorage.length)
MultiArray.factory(ma2DataType, header.chunkShapeOrdered, typedStorage)
}.getOrElse(createFilled(ma2DataType))
}.get
}.get)
}

class IntChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader {

val ma2DataType: MADataType = MADataType.INT

override def read(path: String): MultiArray =
Using.Manager { use =>
override def read(path: String): Future[MultiArray] =
Future.successful(Using.Manager { use =>
readBytes(path).map { bytes =>
val typedStorage = new Array[Int](chunkSize)
val bais = use(new ByteArrayInputStream(bytes))
Expand All @@ -97,15 +99,15 @@ class IntChunkReader(val store: Store, val header: ZarrHeader) extends ChunkRead
iis.readFully(typedStorage, 0, typedStorage.length)
MultiArray.factory(ma2DataType, header.chunkShapeOrdered, typedStorage)
}.getOrElse(createFilled(ma2DataType))
}.get
}.get)
}

class LongChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader {

val ma2DataType: MADataType = MADataType.LONG

override def read(path: String): MultiArray =
Using.Manager { use =>
override def read(path: String): Future[MultiArray] =
Future.successful(Using.Manager { use =>
readBytes(path).map { bytes =>
val typedStorage = new Array[Long](chunkSize)
val bais = use(new ByteArrayInputStream(bytes))
Expand All @@ -114,15 +116,15 @@ class LongChunkReader(val store: Store, val header: ZarrHeader) extends ChunkRea
iis.readFully(typedStorage, 0, typedStorage.length)
MultiArray.factory(ma2DataType, header.chunkShapeOrdered, typedStorage)
}.getOrElse(createFilled(ma2DataType))
}.get
}.get)
}

class FloatChunkReader(val store: Store, val header: ZarrHeader) extends ChunkReader {

val ma2DataType: MADataType = MADataType.FLOAT

override def read(path: String): MultiArray =
Using.Manager { use =>
override def read(path: String): Future[MultiArray] =
Future.successful(Using.Manager { use =>
readBytes(path).map { bytes =>
val typedStorage = new Array[Float](chunkSize)
val bais = use(new ByteArrayInputStream(bytes))
Expand All @@ -131,5 +133,5 @@ class FloatChunkReader(val store: Store, val header: ZarrHeader) extends ChunkRe
iis.readFully(typedStorage, 0, typedStorage.length)
MultiArray.factory(ma2DataType, header.chunkShapeOrdered, typedStorage)
}.getOrElse(createFilled(ma2DataType))
}.get
}.get)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import java.nio.ByteOrder
import java.nio.file.Path
import java.util

import akka.http.caching.LfuCache
import akka.http.caching.scaladsl.{Cache, CachingSettings}
import com.scalableminds.util.geometry.Vec3Int
import com.scalableminds.util.tools.Fox
import com.typesafe.scalalogging.LazyLogging
import play.api.libs.json.{JsError, JsSuccess, Json}
import ucar.ma2.{InvalidRangeException, Array => MultiArray}

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

object ZarrArray extends LazyLogging {
Expand Down Expand Up @@ -49,13 +54,27 @@ class ZarrArray(relativePath: ZarrPath, store: Store, header: ZarrHeader) extend
ChunkReader.create(store, header)

// cache currently limited to 100 MB per array
private lazy val chunkContentsCache: ChunkContentsCache =
new ChunkContentsCache(maxSizeBytes = 1000 * 1000 * 100, bytesPerEntry = header.bytesPerChunk)
private lazy val chunkContentsCache: Cache[String, MultiArray] = {
val maxSizeBytes = 1000 * 1000 * 100

val maxEntries = maxSizeBytes / header.bytesPerChunk
val defaultCachingSettings = CachingSettings("")
val lfuCacheSettings =
defaultCachingSettings.lfuCacheSettings
.withInitialCapacity(maxEntries)
.withMaxCapacity(maxEntries)
.withTimeToLive(2.hours)
.withTimeToIdle(1.hour)
val cachingSettings =
defaultCachingSettings.withLfuCacheSettings(lfuCacheSettings)
val lfuCache: Cache[String, MultiArray] = LfuCache(cachingSettings)
lfuCache
}

// @return Byte array in fortran-order with little-endian values
@throws[IOException]
@throws[InvalidRangeException]
def readBytesXYZ(shape: Vec3Int, offset: Vec3Int): Array[Byte] = {
def readBytesXYZ(shape: Vec3Int, offset: Vec3Int)(implicit ec: ExecutionContext): Fox[Array[Byte]] = {
// Assumes that the last three dimensions of the array are x, y, z
val paddingDimensionsCount = header.shape.length - 3
val offsetArray = Array.fill(paddingDimensionsCount)(0) :+ offset.x :+ offset.y :+ offset.z
Expand All @@ -67,39 +86,44 @@ class ZarrArray(relativePath: ZarrPath, store: Store, header: ZarrHeader) extend
// @return Byte array in fortran-order with little-endian values
@throws[IOException]
@throws[InvalidRangeException]
def readBytes(shape: Array[Int], offset: Array[Int]): Array[Byte] = {
val typedData = readAsFortranOrder(shape, offset)
BytesConverter.toByteArray(typedData, header.dataType, ByteOrder.LITTLE_ENDIAN)
}
def readBytes(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext): Fox[Array[Byte]] =
for {
typedData <- readAsFortranOrder(shape, offset)
} yield BytesConverter.toByteArray(typedData, header.dataType, ByteOrder.LITTLE_ENDIAN)

@throws[IOException]
@throws[InvalidRangeException]
def readAsFortranOrder(shape: Array[Int], offset: Array[Int]): Object = {
def readAsFortranOrder(shape: Array[Int], offset: Array[Int])(implicit ec: ExecutionContext): Fox[Object] = {
val buffer = MultiArrayUtils.createDataBuffer(header.dataType, shape)
val chunkIndices = ChunkUtils.computeChunkIndices(header.shape, header.chunks, shape, offset)
chunkIndices.foreach { chunkIndex: Array[Int] =>
val sourceChunk: MultiArray = getSourceChunkDataWithCache(chunkIndex)
val offsetInChunk = computeOffsetInChunk(chunkIndex, offset)
if (partialCopyingIsNotNeeded(shape, offsetInChunk)) {
return sourceChunk.getStorage
} else {
val sourceChunkInCOrder: MultiArray =
if (header.order == ArrayOrder.C)
sourceChunk
else MultiArrayUtils.orderFlippedView(sourceChunk)
val targetInCOrder: MultiArray =
MultiArrayUtils.orderFlippedView(MultiArrayUtils.createArrayWithGivenStorage(buffer, shape.reverse))
MultiArrayUtils.copyRange(offsetInChunk, sourceChunkInCOrder, targetInCOrder)
}
val res = Fox.serialCombined(chunkIndices.toList) { chunkIndex: Array[Int] =>
for {
sourceChunk: MultiArray <- getSourceChunkDataWithCache(chunkIndex)
offsetInChunk = computeOffsetInChunk(chunkIndex, offset)
_ = if (partialCopyingIsNotNeeded(shape, offsetInChunk)) {
return Future.successful(sourceChunk.getStorage)
} else {
val sourceChunkInCOrder: MultiArray =
if (header.order == ArrayOrder.C)
sourceChunk
else MultiArrayUtils.orderFlippedView(sourceChunk)
val targetInCOrder: MultiArray =
MultiArrayUtils.orderFlippedView(MultiArrayUtils.createArrayWithGivenStorage(buffer, shape.reverse))
MultiArrayUtils.copyRange(offsetInChunk, sourceChunkInCOrder, targetInCOrder)
}
} yield ()
}
buffer
for {
_ <- res
} yield buffer
}

private def getSourceChunkDataWithCache(chunkIndex: Array[Int]): MultiArray = {
private def getSourceChunkDataWithCache(chunkIndex: Array[Int]): Future[MultiArray] = {
val chunkFilename = getChunkFilename(chunkIndex)
val chunkFilePath = relativePath.resolve(chunkFilename)
val storeKey = chunkFilePath.storeKey
chunkContentsCache.getOrLoadAndPut(storeKey)(chunkReader.read)

chunkContentsCache.getOrLoad(storeKey, chunkReader.read)
}

private def getChunkFilename(chunkIndex: Array[Int]): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.dataformats.DataCubeHandle
import com.scalableminds.webknossos.datastore.models.requests.DataReadInstruction
import com.typesafe.scalalogging.LazyLogging
import net.liftweb.common.{Box, Empty, Failure, Full}
import net.liftweb.common.{Empty, Failure, Full}

import scala.concurrent.ExecutionContext.Implicits.global

Expand Down Expand Up @@ -44,7 +44,7 @@ class DataCubeCache(val maxEntries: Int)
* returns it.
*/
def withCache[T](readInstruction: DataReadInstruction)(loadF: DataReadInstruction => Fox[DataCubeHandle])(
f: DataCubeHandle => Box[T]): Fox[T] = {
f: DataCubeHandle => Fox[T]): Fox[T] = {
val cachedCubeInfo = CachedCube.from(readInstruction)

def handleUncachedCube(): Fox[T] = {
Expand All @@ -62,9 +62,10 @@ class DataCubeCache(val maxEntries: Int)
put(cachedCubeInfo, cubeFox)

cubeFox.flatMap { cube =>
val result = f(cube)
cube.finishAccess()
result
for {
result <- f(cube)
_ = cube.finishAccess()
} yield result
}
}

Expand Down

0 comments on commit d42694a

Please sign in to comment.