Skip to content

Commit 8eb3982

Browse files
authored
Apply-Agglomerate Performance Increase #2 (#4706)
* load blocks of 10 ids instead of one and remove unnecessary async code * use hdf dataset for caching and synchronize cache correctly * log time of handling uncached keys and create new config value for block size * add cumsum support to access data in a blockwise fashion * minor refactoring * shorten range for long cumsum ranges * fix bb traversal * add logging * calc global cuboid for processing * use correct sorting * cleanup code * initialize offset with reader range * update config * update changelog * implement pr feedback
1 parent b8fc1dd commit 8eb3982

File tree

11 files changed

+406
-191
lines changed

11 files changed

+406
-191
lines changed

CHANGELOG.unreleased.md

+2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released
1717
### Changed
1818
- When d/f switching is turned off and a slice is copied with the shortcut `v`, the previous slice used as the source will always be slice - 1 and `shift + v` will always take slice + 1 as the slice to copy from. [#4728](https://github.com/scalableminds/webknossos/pull/4728)
1919
- Disabled the autofill feature of the brush when using this tool to erase data. [#4729](https://github.com/scalableminds/webknossos/pull/4729)
20+
- Improved the performance of applying agglomerate files. [#4706](https://github.com/scalableminds/webknossos/pull/4706)
21+
2022

2123
### Fixed
2224
- Speed up NML import in existing tracings for NMLs with many trees (20,000+). [#4742](https://github.com/scalableminds/webknossos/pull/4742)

conf/application.conf

+4-2
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
105105
braingames.binary {
106106
cacheMaxSize = 40 # number of entries
107107
mappingCacheMaxSize = 5 # number of entries
108-
agglomerateFileCacheMaxSize = 5 # number of entries
109-
agglomerateCacheMaxSize = 100 # number of entries
108+
agglomerateFileCacheMaxSize = 15 # number of entries
109+
agglomerateCacheMaxSize = 625000 # number of entries
110+
agglomerateStandardBlockSize = 512 # standard block size of cache reads, best size ~= file block size / bytes per id
111+
agglomerateMaxReaderRange = 1310720 # max size per read when using cumsum.json
110112
loadTimeout = 10 # in seconds
111113
saveTimeout = 10 # in seconds
112114
isosurfaceTimeout = 30 # in seconds

util/src/main/scala/com/scalableminds/util/cache/LRUConcurrentCache.scala

+8
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ trait LRUConcurrentCache[K, V] {
4747
size
4848
}
4949

50+
def getOrHandleUncachedKey(key: K, handleUncachedKey: () => V): V =
51+
cache.synchronized {
52+
Option(cache.get(key)) match {
53+
case Some(value) => value
54+
case None => handleUncachedKey()
55+
}
56+
}
57+
5058
def clear(): Unit =
5159
cache.clear()
5260
}

webknossos-datastore/app/com/scalableminds/webknossos/datastore/DataStoreConfig.scala

+2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ class DataStoreConfig @Inject()(configuration: Configuration) extends ConfigRead
2525
val cacheMaxSize = get[Int]("braingames.binary.cacheMaxSize")
2626
val mappingCacheMaxSize = get[Int]("braingames.binary.mappingCacheMaxSize")
2727
val agglomerateCacheMaxSize = get[Int]("braingames.binary.agglomerateCacheMaxSize")
28+
val agglomerateStandardBlockSize = get[Int]("braingames.binary.agglomerateStandardBlockSize")
2829
val agglomerateFileCacheMaxSize = get[Int]("braingames.binary.agglomerateFileCacheMaxSize")
30+
val agglomerateMaxReaderRange = get[Int]("braingames.binary.agglomerateMaxReaderRange")
2931
val isosurfaceTimeout = get[Int]("braingames.binary.isosurfaceTimeout") seconds
3032
val isosurfaceActorPoolSize = get[Int](path = "braingames.binary.isosurfaceActorPoolSize")
3133

Original file line numberDiff line numberDiff line change
@@ -1,30 +1,32 @@
11
package com.scalableminds.webknossos.datastore.services
22

33
import java.nio._
4-
import java.nio.file.Paths
4+
import java.nio.file.{Files, Paths}
55

66
import ch.systemsx.cisd.hdf5._
77
import com.scalableminds.util.io.PathUtils
8-
import com.scalableminds.util.tools.{Fox, FoxImplicits}
98
import com.scalableminds.webknossos.datastore.DataStoreConfig
109
import com.scalableminds.webknossos.datastore.models.requests.DataServiceDataRequest
11-
import com.scalableminds.webknossos.datastore.storage.{AgglomerateCache, AgglomerateFileCache, CachedReader}
10+
import com.scalableminds.webknossos.datastore.storage.{
11+
AgglomerateIdCache,
12+
AgglomerateFileCache,
13+
BoundingBoxCache,
14+
CachedAgglomerateFile,
15+
CumsumParser
16+
}
1217
import com.typesafe.scalalogging.LazyLogging
1318
import javax.inject.Inject
1419
import org.apache.commons.io.FilenameUtils
1520
import spire.math.{UByte, UInt, ULong, UShort}
1621

17-
import scala.concurrent.ExecutionContext.Implicits.global
18-
import scala.util.Try
19-
20-
class AgglomerateService @Inject()(config: DataStoreConfig) extends DataConverter with FoxImplicits with LazyLogging {
22+
class AgglomerateService @Inject()(config: DataStoreConfig) extends DataConverter with LazyLogging {
2123
val agglomerateDir = "agglomerates"
2224
val agglomerateFileExtension = "hdf5"
2325
val datasetName = "/segment_to_agglomerate"
2426
val dataBaseDir = Paths.get(config.Braingames.Binary.baseFolder)
27+
val cumsumFileName = "cumsum.json"
2528

26-
lazy val cachedFileHandles = new AgglomerateFileCache(config.Braingames.Binary.agglomerateFileCacheMaxSize)
27-
lazy val cache = new AgglomerateCache(config.Braingames.Binary.agglomerateCacheMaxSize)
29+
lazy val agglomerateFileCache = new AgglomerateFileCache(config.Braingames.Binary.agglomerateFileCacheMaxSize)
2830

2931
def exploreAgglomerates(organizationName: String, dataSetName: String, dataLayerName: String): Set[String] = {
3032
val layerDir = dataBaseDir.resolve(organizationName).resolve(dataSetName).resolve(dataLayerName)
@@ -38,46 +40,85 @@ class AgglomerateService @Inject()(config: DataStoreConfig) extends DataConverte
3840
.toSet
3941
}
4042

41-
def applyAgglomerate(request: DataServiceDataRequest)(data: Array[Byte]): Fox[Array[Byte]] = {
42-
def segmentToAgglomerate(segmentId: Long) =
43-
cache.withCache(request, segmentId, cachedFileHandles)(readFromFile = readHDF)(loadReader = initHDFReader)
44-
43+
def applyAgglomerate(request: DataServiceDataRequest)(data: Array[Byte]): Array[Byte] = {
4544
def byteFunc(buf: ByteBuffer, lon: Long) = buf put lon.toByte
4645
def shortFunc(buf: ByteBuffer, lon: Long) = buf putShort lon.toShort
4746
def intFunc(buf: ByteBuffer, lon: Long) = buf putInt lon.toInt
4847
def longFunc(buf: ByteBuffer, lon: Long) = buf putLong lon
4948

50-
def convertToAgglomerate(input: Array[Long],
49+
def convertToAgglomerate(input: Array[ULong],
5150
numBytes: Int,
52-
bufferFunc: (ByteBuffer, Long) => ByteBuffer): Fox[Array[Byte]] = {
53-
val agglomerateIds = Fox.combined(input.map(segmentToAgglomerate))
54-
agglomerateIds.map(
55-
_.foldLeft(ByteBuffer.allocate(numBytes * input.length).order(ByteOrder.LITTLE_ENDIAN))(bufferFunc).array)
51+
bufferFunc: (ByteBuffer, Long) => ByteBuffer): Array[Byte] = {
52+
val cachedAgglomerateFile = agglomerateFileCache.withCache(request)(initHDFReader)
53+
54+
val agglomerateIds = cachedAgglomerateFile.cache match {
55+
case Left(agglomerateIdCache) =>
56+
input.map(
57+
el =>
58+
agglomerateIdCache.withCache(el,
59+
cachedAgglomerateFile.reader,
60+
cachedAgglomerateFile.dataset,
61+
cachedAgglomerateFile.size)(readHDF))
62+
case Right(boundingBoxCache) =>
63+
boundingBoxCache.withCache(request, input, cachedAgglomerateFile.reader)(readHDF)
64+
}
65+
cachedAgglomerateFile.finishAccess()
66+
67+
agglomerateIds
68+
.foldLeft(ByteBuffer.allocate(numBytes * input.length).order(ByteOrder.LITTLE_ENDIAN))(bufferFunc)
69+
.array
5670
}
5771

5872
convertData(data, request.dataLayer.elementClass) match {
59-
case data: Array[UByte] => convertToAgglomerate(data.map(_.toLong), 1, byteFunc)
60-
case data: Array[UShort] => convertToAgglomerate(data.map(_.toLong), 2, shortFunc)
61-
case data: Array[UInt] => convertToAgglomerate(data.map(_.toLong), 4, intFunc)
62-
case data: Array[ULong] => convertToAgglomerate(data.map(_.toLong), 8, longFunc)
63-
// we can safely map the ULong to Long because we only do operations that are compatible with the two's complement
64-
case _ => Fox.successful(data)
73+
case data: Array[UByte] => convertToAgglomerate(data.map(e => ULong(e.toLong)), 1, byteFunc)
74+
case data: Array[UShort] => convertToAgglomerate(data.map(e => ULong(e.toLong)), 2, shortFunc)
75+
case data: Array[UInt] => convertToAgglomerate(data.map(e => ULong(e.toLong)), 4, intFunc)
76+
case data: Array[ULong] => convertToAgglomerate(data, 8, longFunc)
77+
case _ => data
6578
}
6679
}
6780

68-
private def readHDF(reader: IHDF5Reader, segmentId: Long): Fox[Long] =
69-
// We don't need to differentiate between the datatypes because the underlying library does the conversion for us
70-
try2Fox(Try(reader.uint64().readArrayBlockWithOffset(datasetName, 1, segmentId).head))
81+
// This uses a HDF5DataSet, which improves performance per call but doesn't permit parallel calls with the same dataset.
82+
private def readHDF(reader: IHDF5Reader, dataSet: HDF5DataSet, segmentId: Long, blockSize: Long): Array[Long] =
83+
// We don't need to differentiate between the data types because the underlying library does the conversion for us
84+
reader.uint64().readArrayBlockWithOffset(dataSet, blockSize.toInt, segmentId)
85+
86+
// This uses the datasetName, which allows us to call it on the same hdf file in parallel.
87+
private def readHDF(reader: IHDF5Reader, segmentId: Long, blockSize: Long) =
88+
reader.uint64().readArrayBlockWithOffset(datasetName, blockSize.toInt, segmentId)
7189

7290
private def initHDFReader(request: DataServiceDataRequest) = {
73-
val hdfFile = Try(
91+
val hdfFile =
7492
dataBaseDir
7593
.resolve(request.dataSource.id.team)
7694
.resolve(request.dataSource.id.name)
7795
.resolve(request.dataLayer.name)
7896
.resolve(agglomerateDir)
7997
.resolve(s"${request.settings.appliedAgglomerate.get}.${agglomerateFileExtension}")
80-
.toFile)
81-
try2Fox(hdfFile.map(f => CachedReader(HDF5FactoryProvider.get.openForReading(f))))
98+
.toFile
99+
100+
val cumsumPath =
101+
dataBaseDir
102+
.resolve(request.dataSource.id.team)
103+
.resolve(request.dataSource.id.name)
104+
.resolve(request.dataLayer.name)
105+
.resolve(agglomerateDir)
106+
.resolve(cumsumFileName)
107+
108+
val reader = HDF5FactoryProvider.get.openForReading(hdfFile)
109+
110+
val cache: Either[AgglomerateIdCache, BoundingBoxCache] =
111+
if (Files.exists(cumsumPath)) {
112+
Right(CumsumParser.parse(cumsumPath.toFile, ULong(config.Braingames.Binary.agglomerateMaxReaderRange)))
113+
} else {
114+
Left(
115+
new AgglomerateIdCache(config.Braingames.Binary.agglomerateCacheMaxSize,
116+
config.Braingames.Binary.agglomerateStandardBlockSize))
117+
}
118+
119+
CachedAgglomerateFile(reader,
120+
reader.`object`().openDataSet(datasetName),
121+
ULong(reader.getDataSetInformation(datasetName).getNumberOfElements),
122+
cache)
82123
}
83124
}

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/BinaryDataService.scala

+17-33
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,18 @@
11
package com.scalableminds.webknossos.datastore.services
2-
import scala.reflect.io.Directory
2+
33
import java.io.File
4-
import java.nio.{ByteBuffer, ByteOrder, LongBuffer}
5-
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
4+
import java.nio.file.{Files, Path}
65

76
import com.scalableminds.util.geometry.{Point3D, Vector3I}
8-
import com.scalableminds.webknossos.datastore.models.BucketPosition
9-
import com.scalableminds.webknossos.datastore.models.datasource.{Category, DataLayer, ElementClass}
10-
import com.scalableminds.webknossos.datastore.models.requests.{
11-
DataReadInstruction,
12-
DataServiceDataRequest,
13-
DataServiceMappingRequest,
14-
MappingReadInstruction
15-
}
16-
import com.scalableminds.webknossos.datastore.storage.{
17-
CachedAgglomerateFile,
18-
CachedAgglomerateKey,
19-
CachedCube,
20-
DataCubeCache
21-
}
227
import com.scalableminds.util.tools.ExtendedTypes.ExtendedArraySeq
238
import com.scalableminds.util.tools.{Fox, FoxImplicits}
9+
import com.scalableminds.webknossos.datastore.models.BucketPosition
10+
import com.scalableminds.webknossos.datastore.models.datasource.{Category, DataLayer, ElementClass}
11+
import com.scalableminds.webknossos.datastore.models.requests.{DataReadInstruction, DataServiceDataRequest}
12+
import com.scalableminds.webknossos.datastore.storage.{AgglomerateFileKey, CachedCube, DataCubeCache}
2413
import com.typesafe.scalalogging.LazyLogging
2514
import net.liftweb.common.Full
26-
import spire.math.UInt
2715

28-
import scala.collection.mutable
2916
import scala.concurrent.ExecutionContext.Implicits.global
3017
import scala.concurrent.duration._
3118

@@ -59,27 +46,24 @@ class BinaryDataService(dataBaseDir: Path,
5946
def handleDataRequests(requests: List[DataServiceDataRequest]): Fox[(Array[Byte], List[Int])] = {
6047
def convertIfNecessary[T](isNecessary: Boolean,
6148
inputArray: Array[Byte],
62-
conversionFunc: Array[Byte] => T,
63-
transformInput: Array[Byte] => T): T =
64-
if (isNecessary) conversionFunc(inputArray) else transformInput(inputArray)
49+
conversionFunc: Array[Byte] => Array[Byte]): Array[Byte] =
50+
if (isNecessary) conversionFunc(inputArray) else inputArray
6551

6652
val requestsCount = requests.length
6753
val requestData = requests.zipWithIndex.map {
6854
case (request, index) =>
6955
for {
7056
data <- handleDataRequest(request)
71-
mappedData <- convertIfNecessary(
72-
request.settings.appliedAgglomerate.isDefined && request.dataLayer.category == Category.segmentation && request.cuboid.resolution.maxDim <= 8,
57+
mappedData = convertIfNecessary(
58+
request.settings.appliedAgglomerate.isDefined && request.dataLayer.category == Category.segmentation && request.cuboid.resolution.maxDim <= 16,
7359
data,
74-
agglomerateService.applyAgglomerate(request),
75-
Fox.successful(_)
60+
agglomerateService.applyAgglomerate(request)
7661
)
7762
convertedData = convertIfNecessary(
7863
request.dataLayer.elementClass == ElementClass.uint64 && request.dataLayer.category == Category.segmentation,
7964
mappedData,
80-
convertToUInt32,
81-
identity)
82-
resultData = convertIfNecessary(request.settings.halfByte, convertedData, convertToHalfByte, identity)
65+
convertToUInt32)
66+
resultData = convertIfNecessary(request.settings.halfByte, convertedData, convertToHalfByte)
8367
} yield (resultData, index)
8468
}
8569

@@ -196,11 +180,11 @@ class BinaryDataService(dataBaseDir: Path,
196180
cubeKey.dataSourceName == dataSetName && cubeKey.organization == organizationName && layerName.forall(
197181
_ == cubeKey.dataLayerName)
198182

199-
def matchingAgglomerate(cachedAgglomerate: CachedAgglomerateKey) =
200-
cachedAgglomerate.dataSourceName == dataSetName && cachedAgglomerate.organization == organizationName && layerName
201-
.forall(_ == cachedAgglomerate.dataLayerName)
183+
def matchingAgglomerate(agglomerateKey: AgglomerateFileKey) =
184+
agglomerateKey.dataSourceName == dataSetName && agglomerateKey.organization == organizationName && layerName
185+
.forall(_ == agglomerateKey.dataLayerName)
202186

203-
agglomerateService.cache.clear(matchingAgglomerate)
187+
agglomerateService.agglomerateFileCache.clear(matchingAgglomerate)
204188
cache.clear(matchingPredicate)
205189
}
206190

webknossos-datastore/app/com/scalableminds/webknossos/datastore/services/IsosurfaceService.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ class IsosurfaceService @Inject()(
108108
Fox.successful(data)
109109
}
110110

111-
def applyAgglomerate(data: Array[Byte]): Fox[Array[Byte]] =
111+
def applyAgglomerate(data: Array[Byte]): Array[Byte] =
112112
request.mapping match {
113-
case Some(mappingName) =>
113+
case Some(_) =>
114114
request.mappingType match {
115115
case Some("HDF5") =>
116116
val dataRequest = DataServiceDataRequest(
@@ -122,10 +122,10 @@ class IsosurfaceService @Inject()(
122122
Vector3I(1, 1, 1))
123123
agglomerateService.applyAgglomerate(dataRequest)(data)
124124
case _ =>
125-
Fox.successful(data)
125+
data
126126
}
127127
case _ =>
128-
Fox.successful(data)
128+
data
129129
}
130130

131131
def convertData(data: Array[Byte]): Array[T] = {
@@ -194,7 +194,7 @@ class IsosurfaceService @Inject()(
194194

195195
for {
196196
data <- binaryDataService.handleDataRequest(dataRequest)
197-
agglomerateMappedData <- applyAgglomerate(data)
197+
agglomerateMappedData = applyAgglomerate(data)
198198
typedData = convertData(agglomerateMappedData)
199199
mappedData <- applyMapping(typedData)
200200
mappedSegmentId <- applyMapping(Array(typedSegmentId)).map(_.head)

0 commit comments

Comments
 (0)