diff --git a/core-performance/src/test/scala/swaydb/core/io/file/DBFileWriteReadPerformanceSpec.scala b/core-performance/src/test/scala/swaydb/core/io/file/DBFileWriteReadPerformanceSpec.scala index 1f9ef3c4c..0f80a4183 100644 --- a/core-performance/src/test/scala/swaydb/core/io/file/DBFileWriteReadPerformanceSpec.scala +++ b/core-performance/src/test/scala/swaydb/core/io/file/DBFileWriteReadPerformanceSpec.scala @@ -33,6 +33,7 @@ import swaydb.core.CommonAssertions.randomIOStrategy import swaydb.core.TestData._ import swaydb.core.actor.FileSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.segment.format.a.block.reader.BlockRefReader import swaydb.core.util.{Benchmark, BlockCacheFileIDGenerator, Bytes} import swaydb.core.{TestBase, TestSweeper} @@ -45,7 +46,7 @@ import scala.util.Random class DBFileWriteReadPerformanceSpec extends TestBase { - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeper = TestSweeper.memorySweeperMax diff --git a/core-performance/src/test/scala/swaydb/core/map/MapsPerformanceSpec.scala b/core-performance/src/test/scala/swaydb/core/map/MapsPerformanceSpec.scala index 7e6b2c47d..ca73b767e 100644 --- a/core-performance/src/test/scala/swaydb/core/map/MapsPerformanceSpec.scala +++ b/core-performance/src/test/scala/swaydb/core/map/MapsPerformanceSpec.scala @@ -38,7 +38,7 @@ // // implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default // implicit def testTimer: TestTimer = TestTimer.random -// implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper +// implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper // implicit val memorySweeper = TestSweeper.memorySweeperMax // implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long // diff --git a/core-performance/src/test/scala/swaydb/core/segment/format/a/SegmentReadPerformanceSpec.scala b/core-performance/src/test/scala/swaydb/core/segment/format/a/SegmentReadPerformanceSpec.scala index 36aea5be7..cc40a05f9 100644 --- a/core-performance/src/test/scala/swaydb/core/segment/format/a/SegmentReadPerformanceSpec.scala +++ b/core-performance/src/test/scala/swaydb/core/segment/format/a/SegmentReadPerformanceSpec.scala @@ -25,6 +25,7 @@ package swaydb.core.segment.format.a import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data.Memory import swaydb.core.io.file.BlockCache @@ -75,7 +76,7 @@ sealed trait SegmentReadPerformanceSpec extends TestBase { val keyValuesCount = 1000000 // override def deleteFiles = false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper // implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.someMemorySweeperMax // implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.someMemorySweeper10 implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = None diff --git a/core-performance/src/test/scala/swaydb/core/segment/format/a/block/BlockReaderPerformanceSpec.scala b/core-performance/src/test/scala/swaydb/core/segment/format/a/block/BlockReaderPerformanceSpec.scala index 407b66bb8..5926b2966 100644 --- a/core-performance/src/test/scala/swaydb/core/segment/format/a/block/BlockReaderPerformanceSpec.scala +++ b/core-performance/src/test/scala/swaydb/core/segment/format/a/block/BlockReaderPerformanceSpec.scala @@ -26,8 +26,8 @@ package swaydb.core.segment.format.a.block import swaydb.IOValues._ import swaydb.core.TestData._ -import swaydb.core.actor.FileSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.{BlockCache, DBFile} import swaydb.core.io.reader.FileReader import swaydb.core.segment.format.a.block.reader.{BlockReader, BlockRefReader} @@ -39,7 +39,7 @@ import swaydb.data.util.StorageUnits._ class BlockReaderPerformanceSpec extends TestBase { - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeper = TestSweeper.memorySweeperMax diff --git a/core-stress/src/test/scala/swaydb/core/io/file/DBFileStressWriteSpec.scala b/core-stress/src/test/scala/swaydb/core/io/file/DBFileStressWriteSpec.scala index 96ac378c0..54726a1ad 100644 --- a/core-stress/src/test/scala/swaydb/core/io/file/DBFileStressWriteSpec.scala +++ b/core-stress/src/test/scala/swaydb/core/io/file/DBFileStressWriteSpec.scala @@ -30,6 +30,7 @@ import swaydb.core.TestData._ import swaydb.core.TestSweeper.fileSweeper import swaydb.core.actor.FileSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.util.{Benchmark, BlockCacheFileIDGenerator} import swaydb.core.{TestBase, TestSweeper} import swaydb.data.util.OperatingSystem @@ -40,7 +41,7 @@ import scala.concurrent.duration._ class DBFileStressWriteSpec extends TestBase { - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeper = TestSweeper.memorySweeperMax diff --git a/core-stress/src/test/scala/swaydb/core/map/MapStressSpec.scala b/core-stress/src/test/scala/swaydb/core/map/MapStressSpec.scala index 00cde5187..4ee52c58a 100644 --- a/core-stress/src/test/scala/swaydb/core/map/MapStressSpec.scala +++ b/core-stress/src/test/scala/swaydb/core/map/MapStressSpec.scala @@ -37,7 +37,7 @@ // implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default // // implicit val skipListMerger = LevelZeroSkipListMerger -// implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper +// implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper // implicit val memorySweeper = TestSweeper.memorySweeperMax // implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long // diff --git a/core-stress/src/test/scala/swaydb/core/map/MapsStressSpec.scala b/core-stress/src/test/scala/swaydb/core/map/MapsStressSpec.scala index 6254780f8..347890d33 100644 --- a/core-stress/src/test/scala/swaydb/core/map/MapsStressSpec.scala +++ b/core-stress/src/test/scala/swaydb/core/map/MapsStressSpec.scala @@ -40,7 +40,7 @@ // implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default // implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long // implicit def testTimer: TestTimer = TestTimer.Empty -// implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper +// implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper // implicit val memorySweeper = TestSweeper.memorySweeperMax // // import swaydb.core.map.serializer.LevelZeroMapEntryReader._ diff --git a/core/src/main/scala/swaydb/core/CoreInitializer.scala b/core/src/main/scala/swaydb/core/CoreInitializer.scala index 653e07728..b19733713 100644 --- a/core/src/main/scala/swaydb/core/CoreInitializer.scala +++ b/core/src/main/scala/swaydb/core/CoreInitializer.scala @@ -48,7 +48,7 @@ import swaydb.data.config._ import swaydb.data.order.{KeyOrder, TimeOrder} import swaydb.data.slice.Slice import swaydb.data.storage.{AppendixStorage, LevelStorage} -import swaydb.{ActorWire, Bag, Error, IO, Scheduler} +import swaydb.{ActorRef, ActorWire, Bag, Error, IO, Scheduler} import scala.concurrent.Await import scala.concurrent.duration._ @@ -124,7 +124,7 @@ private[core] object CoreInitializer extends LazyLogging { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore): IO[swaydb.Error.Boot, Core[Bag.Less]] = { - implicit val fileSweeper: FileSweeper.Enabled = + implicit val fileSweeper: ActorRef[FileSweeper.Command, Unit] = FileSweeper(fileCache) val memorySweeper: Option[MemorySweeper.Enabled] = @@ -152,7 +152,7 @@ private[core] object CoreInitializer extends LazyLogging { ThrottleCompactor implicit val bufferCleaner: ByteBufferSweeperActor = - ByteBufferSweeper()(Scheduler()(fileSweeper.ec)) + ByteBufferSweeper()(Scheduler()(fileSweeper.executionContext)) def createLevel(id: Long, nextLevel: Option[NextLevel], diff --git a/core/src/main/scala/swaydb/core/CoreShutdown.scala b/core/src/main/scala/swaydb/core/CoreShutdown.scala index d6d232af5..01ff15d43 100644 --- a/core/src/main/scala/swaydb/core/CoreShutdown.scala +++ b/core/src/main/scala/swaydb/core/CoreShutdown.scala @@ -25,8 +25,9 @@ package swaydb.core import com.typesafe.scalalogging.LazyLogging -import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor +import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper} import swaydb.core.io.file.BlockCache import swaydb.core.level.compaction.Compactor import swaydb.core.level.compaction.throttle.ThrottleState @@ -48,7 +49,7 @@ private[core] object CoreShutdown extends LazyLogging { def shutdown(zero: LevelZero, retryOnBusyDelay: FiniteDuration)(implicit compactor: ActorWire[Compactor[ThrottleState], ThrottleState], - fileSweeper: Option[FileSweeper.Enabled], + fileSweeper: Option[FileSweeperActor], blockCache: Option[BlockCache.State], keyValueMemorySweeper: Option[MemorySweeper.KeyValue], scheduler: Scheduler, @@ -111,7 +112,7 @@ private[core] object CoreShutdown extends LazyLogging { _ => fileSweeper match { case Some(fileSweeper) => - logger.info(s"Terminating ${classOf[FileSweeper].getSimpleName}Actor.") + logger.info(s"Terminating FileSweeperActor.") fileSweeper.terminateAndRecover(retryOnBusyDelay) case None => diff --git a/core/src/main/scala/swaydb/core/actor/ByteBufferSweeper.scala b/core/src/main/scala/swaydb/core/actor/ByteBufferSweeper.scala index e2db36b2b..287d309f8 100644 --- a/core/src/main/scala/swaydb/core/actor/ByteBufferSweeper.scala +++ b/core/src/main/scala/swaydb/core/actor/ByteBufferSweeper.scala @@ -48,7 +48,7 @@ private[core] object ByteBufferSweeper extends LazyLogging { implicit class ByteBufferSweeperActorImplicits(cache: ByteBufferSweeperActor) { @inline def actor: ActorRef[Command, State] = - cache.value(()) + this.cache.value(()) } /** diff --git a/core/src/main/scala/swaydb/core/actor/FileSweeper.scala b/core/src/main/scala/swaydb/core/actor/FileSweeper.scala index 7603267dd..8b70ed93d 100644 --- a/core/src/main/scala/swaydb/core/actor/FileSweeper.scala +++ b/core/src/main/scala/swaydb/core/actor/FileSweeper.scala @@ -26,12 +26,11 @@ package swaydb.core.actor import java.nio.file.Path import com.typesafe.scalalogging.LazyLogging -import swaydb.core.cache.Lazy +import swaydb.core.actor.ByteBufferSweeper.{ByteBufferSweeperActor, State} +import swaydb.core.cache.{Cache, CacheNoIO} import swaydb.data.config.{ActorConfig, FileCache} -import swaydb.{Actor, ActorRef, Bag, IO, Scheduler} +import swaydb.{Actor, ActorRef, IO} -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.FiniteDuration import scala.ref.WeakReference private[core] trait FileSweeperItem { @@ -41,68 +40,72 @@ private[core] trait FileSweeperItem { def isOpen: Boolean } -private[swaydb] trait FileSweeper { - def close(file: FileSweeperItem): Unit -} /** * Actor that manages closing and delete files that are overdue. */ private[swaydb] object FileSweeper extends LazyLogging { - /** - * Disables File management. This is generally enabled for in-memory databases - * where closing or deleting files is not really required since GC cleans up these - * files. - */ - case object Disabled extends FileSweeper { - def close(file: FileSweeperItem): Unit = () - } + type FileSweeperActor = ActorRef[FileSweeper.Command, Unit] - /** - * Enables file management. - */ - sealed trait Enabled extends FileSweeper { - def ec: ExecutionContext - def close(file: FileSweeperItem): Unit - def delete(file: FileSweeperItem): Unit - def messageCount(): Int - def terminateAndRecover[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit bag: Bag.Async[BAG], - scheduler: Scheduler): BAG[Unit] + implicit class FileSweeperActorActorImplicits(cache: CacheNoIO[Unit, FileSweeperActor]) { + @inline def actor = + this.cache.value(()) } - private sealed trait Action { + sealed trait Command { def isDelete: Boolean } - private object Action { - case class Delete(file: FileSweeperItem) extends Action { + object Command { + //Delete cannot be a WeakReference because Levels can + //remove references to the file after eventualDelete is invoked. + //If the file gets garbage collected due to it being WeakReference before + //delete on the file is triggered, the physical file will remain on disk. + case class Delete(file: FileSweeperItem) extends Command { def isDelete: Boolean = true } - case class Close(file: WeakReference[FileSweeperItem]) extends Action { + + object Close { + def apply(file: FileSweeperItem): Close = + new Close(new WeakReference[FileSweeperItem](file)) + } + case class Close private(file: WeakReference[FileSweeperItem]) extends Command { def isDelete: Boolean = false } } - def weigher(action: Action) = - if (action.isDelete) 10 else 1 + def weigher(command: Command): Int = + if (command.isDelete) 10 else 1 - def apply(fileCache: FileCache): Option[FileSweeper.Enabled] = + def apply(fileCache: FileCache): Option[FileSweeperActor] = fileCache match { case FileCache.Disable => None + case enable: FileCache.Enable => Some(apply(enable)) } - def apply(fileCache: FileCache.Enable): FileSweeper.Enabled = + + def apply(fileCache: FileCache.Enable): FileSweeperActor = apply( maxOpenSegments = fileCache.maxOpen, actorConfig = fileCache.actorConfig - ) + ).value(()) + + def apply(maxOpenSegments: Int, + actorConfig: ActorConfig): CacheNoIO[Unit, FileSweeperActor] = + Cache.noIO[Unit, ActorRef[Command, Unit]](synchronised = true, stored = true, initial = None) { + (_, _) => + createActor( + maxOpenSegments = maxOpenSegments, + actorConfig = actorConfig + ) + } - private def processAction(action: Action): Unit = - action match { - case Action.Delete(file) => + private def processCommand(command: Command): Unit = + command match { + case Command.Delete(file) => try file.delete() catch { @@ -110,7 +113,7 @@ private[swaydb] object FileSweeper extends LazyLogging { logger.error(s"Failed to delete file. ${file.path}", exception) } - case Action.Close(file) => + case Command.Close(file) => file.get foreach { file => try @@ -122,68 +125,28 @@ private[swaydb] object FileSweeper extends LazyLogging { } } - private def createActor(maxOpenSegments: Int, actorConfig: ActorConfig): ActorRef[Action, Unit] = - Actor.cacheFromConfig[Action]( + private def createActor(maxOpenSegments: Int, actorConfig: ActorConfig): ActorRef[Command, Unit] = + Actor.cacheFromConfig[Command]( config = actorConfig, stashCapacity = maxOpenSegments, weigher = FileSweeper.weigher ) { - case (action, _) => - processAction(action) - } recoverException[Action] { - case (action, io, _) => + case (command, _) => + processCommand(command) + } recoverException[Command] { + case (command, io, _) => io match { case IO.Right(Actor.Error.TerminatedActor) => - processAction(action) + processCommand(command) case IO.Left(exception) => - action match { - case Action.Delete(file) => + command match { + case Command.Delete(file) => logger.error(s"Failed to delete file. Path = ${file.path}.", exception) - case Action.Close(file) => + case Command.Close(file) => logger.error(s"Failed to close file. WeakReference(path: Path) = ${file.get.map(_.path)}.", exception) } } } - - def apply(maxOpenSegments: Int, actorConfig: ActorConfig): FileSweeper.Enabled = { - val lazyActor = - Lazy.value[ActorRef[Action, Unit]]( - synchronised = true, - stored = true, - initial = None - ) - - def actor(): ActorRef[Action, Unit] = - lazyActor.getOrSet(createActor(maxOpenSegments, actorConfig)) - - new FileSweeper.Enabled { - - override def ec = actorConfig.ec - - override def close(file: FileSweeperItem): Unit = - actor() send Action.Close(new WeakReference[FileSweeperItem](file)) - - //Delete cannot be a WeakReference because Levels can - //remove references to the file after eventualDelete is invoked. - //If the file gets garbage collected due to it being WeakReference before - //delete on the file is triggered, the physical file will remain on disk. - override def delete(file: FileSweeperItem): Unit = - actor() send Action.Delete(file) - - override def messageCount(): Int = - if (lazyActor.isDefined) - actor().messageCount - else - 0 - - override def terminateAndRecover[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit bag: Bag.Async[BAG], - scheduler: Scheduler): BAG[Unit] = - if (lazyActor.isDefined) - actor().terminateAndRecover(retryOnBusyDelay) - else - bag.unit - } - } } diff --git a/core/src/main/scala/swaydb/core/io/file/DBFile.scala b/core/src/main/scala/swaydb/core/io/file/DBFile.scala index 14cf389a0..af4717783 100644 --- a/core/src/main/scala/swaydb/core/io/file/DBFile.scala +++ b/core/src/main/scala/swaydb/core/io/file/DBFile.scala @@ -31,6 +31,7 @@ import swaydb.Error.IO.ExceptionHandler import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, FileSweeperItem} import swaydb.core.cache.Cache import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.data.Reserve import swaydb.data.config.IOStrategy import swaydb.data.slice.Slice @@ -46,7 +47,7 @@ object DBFile extends LazyLogging { ioStrategy: IOStrategy, file: Option[DBFileType], blockCacheFileId: Long, - autoClose: Boolean)(implicit fileSweeper: FileSweeper, + autoClose: Boolean)(implicit fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor) = { //FIX-ME: need a better solution. @@ -98,7 +99,7 @@ object DBFile extends LazyLogging { ) if (autoClose) - fileSweeper.close(closer) + fileSweeper send FileSweeper.Command.Close(closer) openResult } @@ -106,14 +107,14 @@ object DBFile extends LazyLogging { self = cache - if (autoClose && file.isDefined) fileSweeper.close(closer) + if (autoClose && file.isDefined) fileSweeper send FileSweeper.Command.Close(closer) cache } def channelWrite(path: Path, ioStrategy: IOStrategy, blockCacheFileId: Long, - autoClose: Boolean)(implicit fileSweeper: FileSweeper, + autoClose: Boolean)(implicit fileSweeper: FileSweeperActor, blockCache: Option[BlockCache.State], bufferCleaner: ByteBufferSweeperActor): DBFile = { val file = ChannelFile.write(path, blockCacheFileId) @@ -140,7 +141,7 @@ object DBFile extends LazyLogging { ioStrategy: IOStrategy, autoClose: Boolean, blockCacheFileId: Long, - checkExists: Boolean = true)(implicit fileSweeper: FileSweeper, + checkExists: Boolean = true)(implicit fileSweeper: FileSweeperActor, blockCache: Option[BlockCache.State], bufferCleaner: ByteBufferSweeperActor): DBFile = if (checkExists && Effect.notExists(path)) @@ -170,7 +171,7 @@ object DBFile extends LazyLogging { autoClose: Boolean, deleteOnClean: Boolean, blockCacheFileId: Long, - bytes: Iterable[Slice[Byte]])(implicit fileSweeper: FileSweeper, + bytes: Iterable[Slice[Byte]])(implicit fileSweeper: FileSweeperActor, blockCache: Option[BlockCache.State], bufferCleaner: ByteBufferSweeperActor): DBFile = { val totalWritten = @@ -201,7 +202,7 @@ object DBFile extends LazyLogging { autoClose: Boolean, deleteOnClean: Boolean, blockCacheFileId: Long, - bytes: Slice[Byte])(implicit fileSweeper: FileSweeper, + bytes: Slice[Byte])(implicit fileSweeper: FileSweeperActor, blockCache: Option[BlockCache.State], bufferCleaner: ByteBufferSweeperActor): DBFile = //do not write bytes if the Slice has empty bytes. @@ -227,7 +228,7 @@ object DBFile extends LazyLogging { autoClose: Boolean, deleteOnClean: Boolean, blockCacheFileId: Long, - checkExists: Boolean = true)(implicit fileSweeper: FileSweeper, + checkExists: Boolean = true)(implicit fileSweeper: FileSweeperActor, blockCache: Option[BlockCache.State], bufferCleaner: ByteBufferSweeperActor): DBFile = if (checkExists && Effect.notExists(path)) { @@ -257,7 +258,7 @@ object DBFile extends LazyLogging { bufferSize: Long, blockCacheFileId: Long, autoClose: Boolean, - deleteOnClean: Boolean)(implicit fileSweeper: FileSweeper, + deleteOnClean: Boolean)(implicit fileSweeper: FileSweeperActor, blockCache: Option[BlockCache.State], bufferCleaner: ByteBufferSweeperActor): DBFile = { val file = diff --git a/core/src/main/scala/swaydb/core/level/Level.scala b/core/src/main/scala/swaydb/core/level/Level.scala index d4a72457c..0df87a976 100644 --- a/core/src/main/scala/swaydb/core/level/Level.scala +++ b/core/src/main/scala/swaydb/core/level/Level.scala @@ -34,6 +34,7 @@ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data.{KeyValue, _} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect._ import swaydb.core.io.file.{BlockCache, Effect} import swaydb.core.level.seek._ @@ -104,7 +105,7 @@ private[core] object Level extends LazyLogging { functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor): IO[swaydb.Error.Level, Level] = { //acquire lock on folder acquireLock(levelStorage) flatMap { @@ -358,7 +359,7 @@ private[core] case class Level(dirs: Seq[Dir], removeWriter: MapEntryWriter[MapEntry.Remove[Slice[Byte]]], addWriter: MapEntryWriter[MapEntry.Put[Slice[Byte], Segment]], keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - val fileSweeper: FileSweeper.Enabled, + val fileSweeper: FileSweeperActor, val bufferCleaner: ByteBufferSweeperActor, val blockCache: Option[BlockCache.State], val segmentIDGenerator: IDGenerator, diff --git a/core/src/main/scala/swaydb/core/level/tool/AppendixRepairer.scala b/core/src/main/scala/swaydb/core/level/tool/AppendixRepairer.scala index 836734938..5a553fc10 100644 --- a/core/src/main/scala/swaydb/core/level/tool/AppendixRepairer.scala +++ b/core/src/main/scala/swaydb/core/level/tool/AppendixRepairer.scala @@ -33,6 +33,7 @@ import swaydb.IO._ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect import swaydb.core.level.AppendixSkipListMerger import swaydb.core.map.serializer.MapEntryWriter @@ -50,7 +51,7 @@ private[swaydb] object AppendixRepairer extends LazyLogging { def apply(levelPath: Path, strategy: AppendixRepairStrategy)(implicit keyOrder: KeyOrder[Slice[Byte]], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore): IO[swaydb.Error.Level, Unit] = { @@ -170,7 +171,7 @@ private[swaydb] object AppendixRepairer extends LazyLogging { def buildAppendixMap(appendixDir: Path, segments: Slice[Segment])(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[Slice[Byte], Segment]], diff --git a/core/src/main/scala/swaydb/core/level/zero/LevelZero.scala b/core/src/main/scala/swaydb/core/level/zero/LevelZero.scala index 0d990987e..df07f0a91 100644 --- a/core/src/main/scala/swaydb/core/level/zero/LevelZero.scala +++ b/core/src/main/scala/swaydb/core/level/zero/LevelZero.scala @@ -35,6 +35,7 @@ import swaydb.core.data.Value.FromValue import swaydb.core.data._ import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect import swaydb.core.level.seek._ import swaydb.core.level.{LevelRef, LevelSeek, NextLevel} @@ -51,7 +52,7 @@ import swaydb.data.order.{KeyOrder, TimeOrder} import swaydb.data.slice.{Slice, SliceOption} import swaydb.data.storage.Level0Storage import swaydb.data.util.StorageUnits._ -import swaydb.{Bag, IO, OK} +import swaydb.{Actor, Bag, IO, OK} import scala.concurrent.duration.{Deadline, _} import scala.jdk.CollectionConverters._ @@ -77,7 +78,7 @@ private[core] object LevelZero extends LazyLogging { logger.info("cacheKeyValueIds is false. Key-value IDs cache disabled!") //LevelZero does not required FileSweeper since they are all Map files. - implicit val fileSweeper: FileSweeper = FileSweeper.Disabled + implicit val fileSweeper: FileSweeperActor = Actor.deadActor() implicit val skipListMerger: SkipListMerger[SliceOption[Byte], MemoryOption, Slice[Byte], Memory] = LevelZeroSkipListMerger val mapsAndPathAndLock = diff --git a/core/src/main/scala/swaydb/core/map/Map.scala b/core/src/main/scala/swaydb/core/map/Map.scala index 9d0619197..5023d3fe8 100644 --- a/core/src/main/scala/swaydb/core/map/Map.scala +++ b/core/src/main/scala/swaydb/core/map/Map.scala @@ -33,6 +33,7 @@ import swaydb.core.actor.FileSweeper import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.map.serializer.{MapEntryReader, MapEntryWriter} import swaydb.core.util.IDGenerator import swaydb.core.util.skiplist.{SkipList, SkipListBase} @@ -57,7 +58,7 @@ private[core] object Map extends LazyLogging { dropCorruptedTailEntries: Boolean)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, writer: MapEntryWriter[MapEntry.Put[K, V]], reader: MapEntryReader[MapEntry[K, V]], @@ -80,7 +81,7 @@ private[core] object Map extends LazyLogging { fileSize: Long)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, writer: MapEntryWriter[MapEntry.Put[K, V]], skipListMerger: SkipListMerger[OK, OV, K, V]): PersistentMap[OK, OV, K, V] = diff --git a/core/src/main/scala/swaydb/core/map/Maps.scala b/core/src/main/scala/swaydb/core/map/Maps.scala index a7ab48d05..6e658d1e1 100644 --- a/core/src/main/scala/swaydb/core/map/Maps.scala +++ b/core/src/main/scala/swaydb/core/map/Maps.scala @@ -36,6 +36,7 @@ import swaydb.core.actor.FileSweeper import swaydb.core.brake.BrakePedal import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect import swaydb.core.io.file.Effect._ import swaydb.core.map.serializer.{MapEntryReader, MapEntryWriter} @@ -56,7 +57,7 @@ private[core] object Maps extends LazyLogging { fileSize: Long, acceleration: LevelZeroMeter => Accelerator)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], @@ -83,7 +84,7 @@ private[core] object Maps extends LazyLogging { acceleration: LevelZeroMeter => Accelerator, recovery: RecoveryMode)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], @@ -151,7 +152,7 @@ private[core] object Maps extends LazyLogging { fileSize: Long, recovery: RecoveryMode)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], @@ -277,7 +278,7 @@ private[core] object Maps extends LazyLogging { def nextMapUnsafe[OK, OV, K <: OK, V <: OV](nextMapSize: Long, currentMap: Map[OK, OV, K, V])(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], @@ -361,7 +362,7 @@ private[core] class Maps[OK, OV, K <: OK, V <: OV](val maps: ConcurrentLinkedDeq acceleration: LevelZeroMeter => Accelerator, @volatile private var currentMap: Map[OK, OV, K, V])(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, val bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], diff --git a/core/src/main/scala/swaydb/core/map/PersistentMap.scala b/core/src/main/scala/swaydb/core/map/PersistentMap.scala index 96566ea23..49ab89dba 100644 --- a/core/src/main/scala/swaydb/core/map/PersistentMap.scala +++ b/core/src/main/scala/swaydb/core/map/PersistentMap.scala @@ -34,6 +34,7 @@ import swaydb.IO._ import swaydb.core.actor.{ByteBufferSweeper, FileSweeper} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect._ import swaydb.core.io.file.{DBFile, Effect} import swaydb.core.map.serializer.{MapCodec, MapEntryReader, MapEntryWriter} @@ -56,7 +57,7 @@ private[map] object PersistentMap extends LazyLogging { nullValue: OV)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, reader: MapEntryReader[MapEntry[K, V]], writer: MapEntryWriter[MapEntry.Put[K, V]], @@ -95,7 +96,7 @@ private[map] object PersistentMap extends LazyLogging { nullKey: OK, nullValue: OV)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], @@ -124,7 +125,7 @@ private[map] object PersistentMap extends LazyLogging { private[map] def firstFile(folder: Path, memoryMapped: MMAP.Map, - fileSize: Long)(implicit fileSweeper: FileSweeper, + fileSize: Long)(implicit fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor): DBFile = memoryMapped match { case MMAP.Enabled(deleteOnClean) => @@ -155,7 +156,7 @@ private[map] object PersistentMap extends LazyLogging { mapReader: MapEntryReader[MapEntry[K, V]], skipListMerger: SkipListMerger[OK, OV, K, V], keyOrder: KeyOrder[K], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore): (RecoveryResult[DBFile], Boolean) = { @@ -227,11 +228,17 @@ private[map] object PersistentMap extends LazyLogging { mmap: MMAP.Map, fileSize: Long, skipList: SkipListConcurrent[OK, OV, K, V])(implicit writer: MapEntryWriter[MapEntry.Put[K, V]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor): Option[DBFile] = oldFiles.lastOption map { lastFile => - val file = nextFile(lastFile, mmap, fileSize, skipList) + val file = + nextFile( + currentFile = lastFile, + mmap = mmap, + size = fileSize, + skipList = skipList + ) //Next file successfully created. delete all old files without the last which gets deleted by nextFile. try { oldFiles.dropRight(1).foreach(_.delete()) @@ -252,7 +259,7 @@ private[map] object PersistentMap extends LazyLogging { mmap: MMAP.Map, size: Long, skipList: SkipListConcurrent[OK, OV, K, V])(implicit writer: MapEntryWriter[MapEntry.Put[K, V]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor): DBFile = { val nextPath = currentFile.path.incrementFileId @@ -293,7 +300,7 @@ protected case class PersistentMap[OK, OV, K <: OK, V <: OV](path: Path, private var currentFile: DBFile, private val hasRangeInitial: Boolean)(implicit keyOrder: KeyOrder[K], timeOrder: TimeOrder[Slice[Byte]], - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, val bufferCleaner: ByteBufferSweeperActor, functionStore: FunctionStore, writer: MapEntryWriter[MapEntry.Put[K, V]], diff --git a/core/src/main/scala/swaydb/core/map/serializer/AppendixMapEntryReader.scala b/core/src/main/scala/swaydb/core/map/serializer/AppendixMapEntryReader.scala index d8a597b34..f5cb653c4 100644 --- a/core/src/main/scala/swaydb/core/map/serializer/AppendixMapEntryReader.scala +++ b/core/src/main/scala/swaydb/core/map/serializer/AppendixMapEntryReader.scala @@ -27,6 +27,7 @@ package swaydb.core.map.serializer import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.core.map.MapEntry import swaydb.core.segment.{Segment, SegmentIO, SegmentSerialiser} @@ -39,7 +40,7 @@ private[core] object AppendixMapEntryReader { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO): AppendixMapEntryReader = @@ -50,7 +51,7 @@ private[core] class AppendixMapEntryReader(mmapSegment: MMAP.Segment)(implicit k timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO) { diff --git a/core/src/main/scala/swaydb/core/map/timer/PersistentTimer.scala b/core/src/main/scala/swaydb/core/map/timer/PersistentTimer.scala index 088b70ba5..461156b4e 100644 --- a/core/src/main/scala/swaydb/core/map/timer/PersistentTimer.scala +++ b/core/src/main/scala/swaydb/core/map/timer/PersistentTimer.scala @@ -28,7 +28,7 @@ import java.nio.file.Path import com.typesafe.scalalogging.LazyLogging import swaydb.Error.Map.ExceptionHandler -import swaydb.IO +import swaydb.{Actor, ActorRef, IO} import swaydb.core.actor.FileSweeper import swaydb.core.data.Time import swaydb.core.function.FunctionStore @@ -68,7 +68,7 @@ private[core] object PersistentTimer extends LazyLogging { writer: MapEntryWriter[MapEntry.Put[Slice[Byte], Slice[Byte]]], reader: MapEntryReader[MapEntry[Slice[Byte], Slice[Byte]]]): IO[swaydb.Error.Map, PersistentTimer] = { //Disabled because autoClose is not required here. - implicit val fileSweeper = FileSweeper.Disabled + implicit val fileSweeper: ActorRef[FileSweeper.Command, Unit] = Actor.deadActor() IO { Map.persistent[SliceOption[Byte], SliceOption[Byte], Slice[Byte], Slice[Byte]]( diff --git a/core/src/main/scala/swaydb/core/segment/MemorySegment.scala b/core/src/main/scala/swaydb/core/segment/MemorySegment.scala index 61dc4fb54..2e271aee6 100644 --- a/core/src/main/scala/swaydb/core/segment/MemorySegment.scala +++ b/core/src/main/scala/swaydb/core/segment/MemorySegment.scala @@ -29,6 +29,7 @@ import java.util.function.Consumer import com.typesafe.scalalogging.LazyLogging import swaydb.core.actor.FileSweeper +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.data.{Memory, _} import swaydb.core.function.FunctionStore import swaydb.core.level.PathsDistributor @@ -62,7 +63,7 @@ protected case class MemorySegment(path: Path, nearestPutDeadline: Option[Deadline])(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper.Enabled) extends Segment with LazyLogging { + fileSweeper: FileSweeperActor) extends Segment with LazyLogging { @volatile private var deleted = false @@ -276,7 +277,7 @@ protected case class MemorySegment(path: Path, !deleted override def deleteSegmentsEventually: Unit = - fileSweeper.delete(this) + fileSweeper send FileSweeper.Command.Delete(this) override def clearCachedKeyValues(): Unit = () diff --git a/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala b/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala index ddcf6cedf..17969008e 100644 --- a/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala +++ b/core/src/main/scala/swaydb/core/segment/PersistentSegmentMany.scala @@ -33,6 +33,7 @@ import swaydb.core.cache.{Cache, CacheNoIO} import swaydb.core.data._ import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.{BlockCache, DBFile, Effect} import swaydb.core.level.PathsDistributor import swaydb.core.segment.format.a.block.binarysearch.BinarySearchIndexBlock @@ -67,7 +68,7 @@ protected object PersistentSegmentMany { functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, segmentIO: SegmentIO): PersistentSegmentMany = { val initial = @@ -143,7 +144,7 @@ protected object PersistentSegmentMany { functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, segmentIO: SegmentIO): PersistentSegmentMany = { @@ -196,7 +197,7 @@ protected object PersistentSegmentMany { keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCacheMemorySweeper: Option[MemorySweeper.Block], blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, segmentIO: SegmentIO): PersistentSegmentMany = { @@ -419,7 +420,7 @@ protected case class PersistentSegmentMany(file: DBFile, timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], segmentIO: SegmentIO) extends PersistentSegment with LazyLogging { @@ -474,7 +475,7 @@ protected case class PersistentSegmentMany(file: DBFile, file.isFileDefined def deleteSegmentsEventually = - fileSweeper.delete(this) + fileSweeper send FileSweeper.Command.Delete(this) def delete: Unit = { logger.trace(s"{}: DELETING FILE", path) diff --git a/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala b/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala index bcb306cc0..69cd357d1 100644 --- a/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala +++ b/core/src/main/scala/swaydb/core/segment/PersistentSegmentOne.scala @@ -33,6 +33,7 @@ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data.{KeyValue, Persistent, PersistentOption} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.{BlockCache, DBFile} import swaydb.core.level.PathsDistributor import swaydb.core.segment.format.a.block.binarysearch.BinarySearchIndexBlock @@ -65,7 +66,7 @@ protected object PersistentSegmentOne { functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, segmentIO: SegmentIO): PersistentSegmentOne = PersistentSegmentOne( @@ -101,7 +102,7 @@ protected object PersistentSegmentOne { functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, segmentIO: SegmentIO): PersistentSegmentOne = { @@ -146,7 +147,7 @@ protected object PersistentSegmentOne { functionStore: FunctionStore, blockCache: Option[BlockCache.State], keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, segmentIO: SegmentIO): PersistentSegment = { @@ -223,7 +224,7 @@ protected case class PersistentSegmentOne(file: DBFile, timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, blockCache: Option[BlockCache.State], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], segmentIO: SegmentIO) extends PersistentSegment with LazyLogging { @@ -249,7 +250,7 @@ protected case class PersistentSegmentOne(file: DBFile, file.isFileDefined def deleteSegmentsEventually = - fileSweeper.delete(this) + fileSweeper send FileSweeper.Command.Delete(this) def delete: Unit = { logger.trace(s"{}: DELETING FILE", path) diff --git a/core/src/main/scala/swaydb/core/segment/Segment.scala b/core/src/main/scala/swaydb/core/segment/Segment.scala index c74a2ae11..6dde50b9c 100644 --- a/core/src/main/scala/swaydb/core/segment/Segment.scala +++ b/core/src/main/scala/swaydb/core/segment/Segment.scala @@ -34,6 +34,7 @@ import swaydb.core.actor.{FileSweeper, FileSweeperItem, MemorySweeper} import swaydb.core.data._ import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.{BlockCache, DBFile, Effect} import swaydb.core.level.PathsDistributor import swaydb.core.map.Map @@ -82,7 +83,7 @@ private[core] object Segment extends LazyLogging { keyValues: MergeStats.Memory.Closed[Iterable])(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, idGenerator: IDGenerator): Slice[MemorySegment] = if (keyValues.isEmpty) { throw IO.throwable("Empty key-values submitted to memory Segment.") @@ -207,7 +208,7 @@ private[core] object Segment extends LazyLogging { mergeStats: MergeStats.Persistent.Closed[Iterable])(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], @@ -239,7 +240,7 @@ private[core] object Segment extends LazyLogging { segments: Iterable[TransientSegment])(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], blockCache: Option[BlockCache.State], @@ -294,7 +295,7 @@ private[core] object Segment extends LazyLogging { private def segmentFile(path: Path, mmap: MMAP.Segment, segmentBytes: Slice[Slice[Byte]])(implicit segmentIO: SegmentIO, - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State]): DBFile = mmap match { @@ -361,7 +362,7 @@ private[core] object Segment extends LazyLogging { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO, @@ -427,7 +428,7 @@ private[core] object Segment extends LazyLogging { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO, @@ -464,7 +465,7 @@ private[core] object Segment extends LazyLogging { maxKeyValueCountPerSegment: Int)(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, idGenerator: IDGenerator): Slice[MemorySegment] = copyToMemory( keyValues = segment.iterator(), @@ -483,7 +484,7 @@ private[core] object Segment extends LazyLogging { createdInLevel: Int)(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, idGenerator: IDGenerator): Slice[MemorySegment] = { val builder = new MergeStats.Memory.Closed[Iterable]( @@ -515,7 +516,7 @@ private[core] object Segment extends LazyLogging { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO): PersistentSegment = { @@ -612,7 +613,7 @@ private[core] object Segment extends LazyLogging { functionStore: FunctionStore, blockCache: Option[BlockCache.State], keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor): PersistentSegment = { implicit val segmentIO: SegmentIO = SegmentIO.defaultSynchronisedStoredIfCompressed diff --git a/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala b/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala index d5dcd86dc..9430d8270 100644 --- a/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala +++ b/core/src/main/scala/swaydb/core/segment/SegmentSerialiser.scala @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.{BlockCache, Effect} import swaydb.core.util.Options._ import swaydb.core.util.{BlockCacheFileIDGenerator, Bytes, Extension, MinMax} @@ -52,7 +53,7 @@ private[core] sealed trait SegmentSerialiser { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO): Segment @@ -116,7 +117,7 @@ private[core] object SegmentSerialiser { timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, keyValueMemorySweeper: Option[MemorySweeper.KeyValue], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State], segmentIO: SegmentIO): Segment = { diff --git a/core/src/test/scala/swaydb/core/TestBase.scala b/core/src/test/scala/swaydb/core/TestBase.scala index 688ae55e9..e7ae66ea9 100644 --- a/core/src/test/scala/swaydb/core/TestBase.scala +++ b/core/src/test/scala/swaydb/core/TestBase.scala @@ -41,6 +41,7 @@ import swaydb.core.TestSweeper.{fileSweeper, _} import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data.{Memory, MemoryOption, Time} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.{BlockCache, DBFile, Effect} import swaydb.core.io.reader.FileReader import swaydb.core.level.compaction._ @@ -213,7 +214,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve flushOnOverflow: Boolean = false, mmap: MMAP.Map = MMAP.Enabled(OperatingSystem.isWindows))(implicit keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default, keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper, + fileSweeper: FileSweeperActor = TestSweeper.fileSweeper, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long): map.Map[SliceOption[Byte], MemoryOption, Slice[Byte], Memory] = { import swaydb.core.map.serializer.LevelZeroMapEntryReader._ @@ -257,7 +258,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve bloomFilterConfig: BloomFilterBlock.Config = BloomFilterBlock.Config.random, segmentConfig: SegmentBlock.Config = SegmentBlock.Config.random.copy(mmap = mmapSegments, minSize = 100.mb, maxCount = Int.MaxValue))(implicit keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default, keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper, + fileSweeper: FileSweeperActor = TestSweeper.fileSweeper, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long, blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache): Segment = { @@ -294,7 +295,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve bloomFilterConfig: BloomFilterBlock.Config = BloomFilterBlock.Config.random, segmentConfig: SegmentBlock.Config = SegmentBlock.Config.random.copy(mmap = mmapSegments))(implicit keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default, keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper, + fileSweeper: FileSweeperActor = TestSweeper.fileSweeper, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long, pathsDistributor: PathsDistributor, @@ -361,7 +362,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve segmentConfig: SegmentBlock.Config = SegmentBlock.Config.random2(pushForward = false, deleteEventually = false, mmap = mmapSegments), keyValues: Slice[Memory] = Slice.empty)(implicit keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default, keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper, + fileSweeper: FileSweeperActor = TestSweeper.fileSweeper, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner, blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long): Level = @@ -393,7 +394,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve throttle: LevelZeroMeter => FiniteDuration = _ => Duration.Zero)(implicit keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default, keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long, - fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper, + fileSweeper: FileSweeperActor = TestSweeper.fileSweeper, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner): LevelZero = LevelZero( mapSize = mapSize, @@ -428,7 +429,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve /** * Creates all file types currently supported which are MMAP and FileChannel. */ - def createDBFiles(mmapPath: Path, mmapBytes: Slice[Byte], channelPath: Path, channelBytes: Slice[Byte])(implicit fileSweeper: FileSweeper, + def createDBFiles(mmapPath: Path, mmapBytes: Slice[Byte], channelPath: Path, channelBytes: Slice[Byte])(implicit fileSweeper: FileSweeperActor, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner, blockCache: Option[BlockCache.State]): List[DBFile] = List( @@ -436,7 +437,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve createChannelWriteAndRead(channelPath, channelBytes) ) - def createDBFiles(mmapBytes: Slice[Byte], channelBytes: Slice[Byte])(implicit fileSweeper: FileSweeper, + def createDBFiles(mmapBytes: Slice[Byte], channelBytes: Slice[Byte])(implicit fileSweeper: FileSweeperActor, cleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State]): List[DBFile] = List( @@ -444,7 +445,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve createChannelWriteAndRead(randomFilePath, channelBytes) ) - def createMMAPWriteAndRead(path: Path, bytes: Slice[Byte])(implicit fileSweeper: FileSweeper, + def createMMAPWriteAndRead(path: Path, bytes: Slice[Byte])(implicit fileSweeper: FileSweeperActor, cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner, blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache): DBFile = DBFile.mmapWriteAndRead( @@ -456,7 +457,7 @@ trait TestBase extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eve bytes = bytes ) - def createChannelWriteAndRead(path: Path, bytes: Slice[Byte])(implicit fileSweeper: FileSweeper, + def createChannelWriteAndRead(path: Path, bytes: Slice[Byte])(implicit fileSweeper: FileSweeperActor, cleaner: ByteBufferSweeperActor, blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache): DBFile = { val blockCacheFileId = BlockCacheFileIDGenerator.nextID diff --git a/core/src/test/scala/swaydb/core/TestData.scala b/core/src/test/scala/swaydb/core/TestData.scala index 13d5488c8..42c138ed0 100644 --- a/core/src/test/scala/swaydb/core/TestData.scala +++ b/core/src/test/scala/swaydb/core/TestData.scala @@ -40,6 +40,7 @@ import swaydb.core.data.Value.{FromValue, FromValueOption, RangeValue} import swaydb.core.data.{KeyValue, _} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.core.level.seek._ import swaydb.core.level.zero.LevelZero @@ -99,7 +100,7 @@ object TestData { implicit class ReopenSegment(segment: PersistentSegment)(implicit keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default, ec: ExecutionContext, keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper.Enabled = fileSweeper, + fileSweeper: FileSweeperActor = fileSweeper, bufferCleaner: ByteBufferSweeperActor = bufferCleaner, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long, blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache, @@ -248,7 +249,7 @@ object TestData { def reopen(segmentSize: Int = level.minSegmentSize, throttle: LevelMeter => Throttle = level.throttle, nextLevel: Option[NextLevel] = level.nextLevel)(implicit keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper = fileSweeper): Level = + fileSweeper: FileSweeperActor = fileSweeper): Level = tryReopen( segmentSize = segmentSize, throttle = throttle, @@ -258,7 +259,7 @@ object TestData { def tryReopen(segmentSize: Int = level.minSegmentSize, throttle: LevelMeter => Throttle = level.throttle, nextLevel: Option[NextLevel] = level.nextLevel)(implicit keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, - fileSweeper: FileSweeper.Enabled = fileSweeper, + fileSweeper: FileSweeperActor = fileSweeper, bufferCleaner: ByteBufferSweeperActor = bufferCleaner, blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache): IO[swaydb.Error.Level, Level] = level.releaseLocks flatMap { @@ -295,7 +296,7 @@ object TestData { def reopen(mapSize: Long = level.maps.map.size)(implicit keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax, timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long, - fileSweeper: FileSweeper.Enabled = fileSweeper, + fileSweeper: FileSweeperActor = fileSweeper, bufferCleaner: ByteBufferSweeperActor = bufferCleaner): LevelZero = { val reopened = level.releaseLocks flatMap { diff --git a/core/src/test/scala/swaydb/core/TestSweeper.scala b/core/src/test/scala/swaydb/core/TestSweeper.scala index bdf328718..f75288ae3 100644 --- a/core/src/test/scala/swaydb/core/TestSweeper.scala +++ b/core/src/test/scala/swaydb/core/TestSweeper.scala @@ -27,6 +27,7 @@ package swaydb.core import swaydb.core.CommonAssertions._ import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.data.config.{ActorConfig, MemoryCache} import swaydb.data.util.StorageUnits._ @@ -62,8 +63,8 @@ private[swaydb] object TestSweeper { def randomBlockCache: Option[BlockCache.State] = orNone(blockCache) - val fileSweeper: FileSweeper.Enabled = - FileSweeper(50, ActorConfig.Basic("Basic test 3", level0PushDownPool)) + val fileSweeper: FileSweeperActor = + FileSweeper(50, ActorConfig.Basic("Basic test 3", level0PushDownPool)).value(()) val bufferCleaner: ByteBufferSweeperActor = ByteBufferSweeper()(TestData.scheduler) } diff --git a/core/src/test/scala/swaydb/core/io/file/ByteBufferSweeperSpec.scala b/core/src/test/scala/swaydb/core/io/file/ByteBufferSweeperSpec.scala index 3c0c3ecfa..ef7108c11 100644 --- a/core/src/test/scala/swaydb/core/io/file/ByteBufferSweeperSpec.scala +++ b/core/src/test/scala/swaydb/core/io/file/ByteBufferSweeperSpec.scala @@ -56,7 +56,7 @@ class ByteBufferSweeperSpec extends TestBase { implicit val terminateTimeout = 10.seconds "clear a MMAP file" in { - implicit val fileSweeper = FileSweeper(0, ActorConfig.Basic("FileSweet test - clear a MMAP file", TestExecutionContext.executionContext)) + implicit val fileSweeper = FileSweeper(0, ActorConfig.Basic("FileSweet test - clear a MMAP file", TestExecutionContext.executionContext)).actor implicit val cleaner: ByteBufferSweeperActor = ByteBufferSweeper() cleaner.actor.terminateAfter(10.seconds) @@ -85,7 +85,7 @@ class ByteBufferSweeperSpec extends TestBase { "it should not fatal terminate" when { "concurrently reading a deleted MMAP file" in { - implicit val fileSweeper = FileSweeper(1, ActorConfig.Timer("FileSweeper Test Timer", 1.second, TestExecutionContext.executionContext)) + implicit val fileSweeper = FileSweeper(1, ActorConfig.Timer("FileSweeper Test Timer", 1.second, TestExecutionContext.executionContext)).actor implicit val cleaner: ByteBufferSweeperActor = ByteBufferSweeper() cleaner.actor.terminateAfter(10.seconds) @@ -124,7 +124,7 @@ class ByteBufferSweeperSpec extends TestBase { sleep(timeout) fileSweeper.terminateAndRecover(terminateTimeout).await(terminateTimeout) - fileSweeper.messageCount() shouldBe 0 + fileSweeper.messageCount shouldBe 0 cleaner.actor.terminateAndRecover(terminateTimeout).await(terminateTimeout) cleaner.actor.messageCount shouldBe 0 diff --git a/core/src/test/scala/swaydb/core/io/file/DBFileSpec.scala b/core/src/test/scala/swaydb/core/io/file/DBFileSpec.scala index 772b43304..881cfe045 100644 --- a/core/src/test/scala/swaydb/core/io/file/DBFileSpec.scala +++ b/core/src/test/scala/swaydb/core/io/file/DBFileSpec.scala @@ -35,6 +35,7 @@ import swaydb.core.CommonAssertions.randomIOStrategy import swaydb.core.TestData._ import swaydb.core.actor.FileSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.util.PipeOps._ import swaydb.core.{TestBase, TestSweeper} import swaydb.data.slice.Slice @@ -42,7 +43,7 @@ import swaydb.data.util.OperatingSystem class DBFileSpec extends TestBase with MockFactory { - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeper = TestSweeper.memorySweeper10 @@ -142,7 +143,7 @@ class DBFileSpec extends TestBase with MockFactory { val bytes = randomBytesSlice() // //opening a file should trigger the onOpen function - // implicit val fileSweeper = mock[FileSweeper.Enabled] + // implicit val fileSweeper = mock[FileSweeperActor] // // fileSweeper.close _ expects * onCall { // dbFile: FileSweeperItem => @@ -201,7 +202,7 @@ class DBFileSpec extends TestBase with MockFactory { val bytes = randomBytesSlice() // //opening a file should trigger the onOpen function - // implicit val fileSweeper = mock[FileSweeper.Enabled] + // implicit val fileSweeper = mock[FileSweeperActor] // fileSweeper.close _ expects * onCall { // dbFile: FileSweeperItem => // dbFile.path shouldBe testFile @@ -268,7 +269,7 @@ class DBFileSpec extends TestBase with MockFactory { val bytes = Slice("bytes one".getBytes()) // //opening a file should trigger the onOpen function - // implicit val fileSweeper = mock[FileSweeper.Enabled] + // implicit val fileSweeper = mock[FileSweeperActor] // fileSweeper.close _ expects * onCall { // dbFile: FileSweeperItem => // dbFile.path shouldBe testFile diff --git a/core/src/test/scala/swaydb/core/level/LevelCollpaseSpec.scala b/core/src/test/scala/swaydb/core/level/LevelCollpaseSpec.scala index 4e83fa965..dcf1cbeb4 100644 --- a/core/src/test/scala/swaydb/core/level/LevelCollpaseSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelCollpaseSpec.scala @@ -29,6 +29,7 @@ import swaydb.IOValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.level.zero.LevelZeroSkipListMerger @@ -74,7 +75,7 @@ sealed trait LevelCollapseSpec extends TestBase { // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelCopySpec.scala b/core/src/test/scala/swaydb/core/level/LevelCopySpec.scala index f4a652c5f..bbf11c70e 100644 --- a/core/src/test/scala/swaydb/core/level/LevelCopySpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelCopySpec.scala @@ -32,6 +32,7 @@ import swaydb.IOValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.io.file.BlockCache import swaydb.core.level.zero.LevelZeroSkipListMerger @@ -75,7 +76,7 @@ sealed trait LevelCopySpec extends TestBase with MockFactory with PrivateMethodT // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelKeyValuesSpec.scala b/core/src/test/scala/swaydb/core/level/LevelKeyValuesSpec.scala index d3b9d93dd..fd9eefb20 100644 --- a/core/src/test/scala/swaydb/core/level/LevelKeyValuesSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelKeyValuesSpec.scala @@ -31,6 +31,7 @@ import swaydb.IOValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.level.zero.LevelZeroSkipListMerger @@ -77,7 +78,7 @@ sealed trait LevelKeyValuesSpec extends TestBase with MockFactory with PrivateMe // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelMapSpec.scala b/core/src/test/scala/swaydb/core/level/LevelMapSpec.scala index bf6f9781d..6e177d7bc 100644 --- a/core/src/test/scala/swaydb/core/level/LevelMapSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelMapSpec.scala @@ -35,6 +35,7 @@ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.actor.ByteBufferSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.level.zero.LevelZeroSkipListMerger import swaydb.core.map.{Map, MapEntry, SkipListMerger} import swaydb.core.segment.ThreadReadState @@ -78,7 +79,7 @@ sealed trait LevelMapSpec extends TestBase with MockFactory with PrivateMethodTe // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelRefreshSpec.scala b/core/src/test/scala/swaydb/core/level/LevelRefreshSpec.scala index 54ceb6673..0cbe45efe 100644 --- a/core/src/test/scala/swaydb/core/level/LevelRefreshSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelRefreshSpec.scala @@ -29,6 +29,7 @@ import org.scalatest.PrivateMethodTester import swaydb.IOValues._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.level.zero.LevelZeroSkipListMerger @@ -74,7 +75,7 @@ sealed trait LevelRefreshSpec extends TestBase with MockFactory with PrivateMeth // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelRemoveSegmentSpec.scala b/core/src/test/scala/swaydb/core/level/LevelRemoveSegmentSpec.scala index c8ca73a21..97c8ed0ea 100644 --- a/core/src/test/scala/swaydb/core/level/LevelRemoveSegmentSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelRemoveSegmentSpec.scala @@ -29,6 +29,7 @@ import org.scalatest.PrivateMethodTester import swaydb.IOValues._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.level.zero.LevelZeroSkipListMerger import swaydb.core.segment.format.a.block.segment.SegmentBlock @@ -69,7 +70,7 @@ sealed trait LevelRemoveSegmentSpec extends TestBase with MockFactory with Priva // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelSegmentSpec.scala b/core/src/test/scala/swaydb/core/level/LevelSegmentSpec.scala index 1a157baab..b395b4dd8 100644 --- a/core/src/test/scala/swaydb/core/level/LevelSegmentSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelSegmentSpec.scala @@ -32,6 +32,7 @@ import swaydb.IOValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.io.file.Effect._ @@ -80,7 +81,7 @@ sealed trait LevelSegmentSpec extends TestBase with MockFactory { // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/LevelSpec.scala b/core/src/test/scala/swaydb/core/level/LevelSpec.scala index 2104366a3..0bbee1ad7 100644 --- a/core/src/test/scala/swaydb/core/level/LevelSpec.scala +++ b/core/src/test/scala/swaydb/core/level/LevelSpec.scala @@ -32,6 +32,7 @@ import swaydb.IO import swaydb.IOValues._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.io.file.Effect @@ -83,7 +84,7 @@ sealed trait LevelSpec extends TestBase with MockFactory with PrivateMethodTeste // override def deleteFiles: Boolean = // false - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactionSpec.scala b/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactionSpec.scala index 7c824d34c..6a69a78e1 100644 --- a/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactionSpec.scala +++ b/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactionSpec.scala @@ -31,6 +31,7 @@ import swaydb.IOValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data.Memory import swaydb.core.level.NextLevel @@ -76,7 +77,7 @@ sealed trait CompactionSpec extends TestBase with MockFactory { implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long implicit val timer = TestTimer.Empty - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeper10 "putForward" should { diff --git a/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactorSpec.scala b/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactorSpec.scala index 3c9bf66f5..18f659fa7 100644 --- a/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactorSpec.scala +++ b/core/src/test/scala/swaydb/core/level/compaction/throttle/ThrottleCompactorSpec.scala @@ -29,6 +29,7 @@ import org.scalatest.OptionValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.level.compaction.{Compaction, Compactor} import swaydb.core.{TestBase, TestExecutionContext, TestSweeper, TestTimer} @@ -71,7 +72,7 @@ sealed trait ThrottleCompactorSpec extends TestBase with MockFactory { implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long implicit val timer = TestTimer.Empty - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeper10 implicit val compactionOrdering = ThrottleLevelOrdering diff --git a/core/src/test/scala/swaydb/core/level/tool/AppendixRepairerSpec.scala b/core/src/test/scala/swaydb/core/level/tool/AppendixRepairerSpec.scala index 0f355c657..e950794c1 100644 --- a/core/src/test/scala/swaydb/core/level/tool/AppendixRepairerSpec.scala +++ b/core/src/test/scala/swaydb/core/level/tool/AppendixRepairerSpec.scala @@ -30,6 +30,7 @@ import swaydb.IOValues._ import swaydb.core.CommonAssertions._ import swaydb.core.RunThis._ import swaydb.core.TestData._ +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.io.file.Effect import swaydb.core.io.file.Effect._ @@ -50,7 +51,7 @@ class AppendixRepairerSpec extends TestBase { implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeper10 "AppendixRepair" should { diff --git a/core/src/test/scala/swaydb/core/map/MapEntrySpec.scala b/core/src/test/scala/swaydb/core/map/MapEntrySpec.scala index 54cfd7a46..0f687ef56 100644 --- a/core/src/test/scala/swaydb/core/map/MapEntrySpec.scala +++ b/core/src/test/scala/swaydb/core/map/MapEntrySpec.scala @@ -32,6 +32,7 @@ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.cache.CacheNoIO import swaydb.core.data.{Memory, MemoryOption, Value} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.core.io.reader.Reader import swaydb.core.map.serializer._ @@ -52,7 +53,7 @@ class MapEntrySpec extends TestBase { implicit val keyOrder = KeyOrder.default implicit def testTimer: TestTimer = TestTimer.Empty - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long diff --git a/core/src/test/scala/swaydb/core/map/MapSpec.scala b/core/src/test/scala/swaydb/core/map/MapSpec.scala index 333de5c20..9557a769e 100644 --- a/core/src/test/scala/swaydb/core/map/MapSpec.scala +++ b/core/src/test/scala/swaydb/core/map/MapSpec.scala @@ -34,6 +34,7 @@ import swaydb.core.TestData._ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data.{Memory, MemoryOption, Value} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect._ import swaydb.core.io.file.{BlockCache, DBFile} import swaydb.core.level.AppendixSkipListMerger @@ -60,7 +61,7 @@ class MapSpec extends TestBase { implicit def testTimer: TestTimer = TestTimer.Empty - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit val skipListMerger = LevelZeroSkipListMerger diff --git a/core/src/test/scala/swaydb/core/map/MapsSpec.scala b/core/src/test/scala/swaydb/core/map/MapsSpec.scala index 67bc0f901..0cd888997 100644 --- a/core/src/test/scala/swaydb/core/map/MapsSpec.scala +++ b/core/src/test/scala/swaydb/core/map/MapsSpec.scala @@ -35,6 +35,7 @@ import swaydb.core.actor.FileSweeper import swaydb.core.data.{Memory, MemoryOption, Value} import swaydb.core.actor.ByteBufferSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect._ import swaydb.core.level.zero.LevelZeroSkipListMerger import swaydb.core.util.Extension @@ -56,7 +57,7 @@ class MapsSpec extends TestBase { implicit val keyOrder = KeyOrder.default implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeper = TestSweeper.memorySweeperMax diff --git a/core/src/test/scala/swaydb/core/map/package.scala b/core/src/test/scala/swaydb/core/map/package.scala index e4b75c4be..87028a5a0 100644 --- a/core/src/test/scala/swaydb/core/map/package.scala +++ b/core/src/test/scala/swaydb/core/map/package.scala @@ -31,6 +31,7 @@ import swaydb.core.data.{Memory, MemoryOption} import swaydb.core.function.FunctionStore import swaydb.core.actor.ByteBufferSweeper import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.map.serializer.{MapEntryReader, MapEntryWriter} import swaydb.data.config.MMAP import swaydb.data.order.{KeyOrder, TimeOrder} @@ -48,7 +49,7 @@ package object map { def reopen(implicit keyOrder: KeyOrder[Slice[Byte]], timeOrder: TimeOrder[Slice[Byte]], functionStore: FunctionStore, - fileSweeper: FileSweeper, + fileSweeper: FileSweeperActor, bufferCleaner: ByteBufferSweeperActor, ec: ExecutionContext, writer: MapEntryWriter[MapEntry.Put[Slice[Byte], Memory]], diff --git a/core/src/test/scala/swaydb/core/map/serializer/AppendixMapEntrySpec.scala b/core/src/test/scala/swaydb/core/map/serializer/AppendixMapEntrySpec.scala index ea9dc90f3..14036ceea 100644 --- a/core/src/test/scala/swaydb/core/map/serializer/AppendixMapEntrySpec.scala +++ b/core/src/test/scala/swaydb/core/map/serializer/AppendixMapEntrySpec.scala @@ -30,6 +30,7 @@ import swaydb.core.CommonAssertions._ import swaydb.core.TestData._ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.core.io.reader.Reader import swaydb.core.map.MapEntry @@ -46,7 +47,7 @@ import swaydb.serializers._ class AppendixMapEntrySpec extends TestBase { implicit val keyOrder = KeyOrder.default - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit def blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache diff --git a/core/src/test/scala/swaydb/core/map/serializer/MapCodecSpec.scala b/core/src/test/scala/swaydb/core/map/serializer/MapCodecSpec.scala index b1368223b..4bc79537e 100644 --- a/core/src/test/scala/swaydb/core/map/serializer/MapCodecSpec.scala +++ b/core/src/test/scala/swaydb/core/map/serializer/MapCodecSpec.scala @@ -33,6 +33,7 @@ import swaydb.core.TestData._ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.data._ import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.core.segment.SegmentIO import swaydb.core.util.skiplist.SkipList @@ -48,7 +49,7 @@ class MapCodecSpec extends TestBase { implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default implicit def testTimer: TestTimer = TestTimer.Empty - implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper implicit val cleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val memorySweeperImplicitSweeper: Option[MemorySweeper.All] = TestSweeper.memorySweeperMax implicit def blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache diff --git a/core/src/test/scala/swaydb/core/segment/SegmentSerialiserSpec.scala b/core/src/test/scala/swaydb/core/segment/SegmentSerialiserSpec.scala index 935bdaacf..be2ec4874 100644 --- a/core/src/test/scala/swaydb/core/segment/SegmentSerialiserSpec.scala +++ b/core/src/test/scala/swaydb/core/segment/SegmentSerialiserSpec.scala @@ -29,6 +29,7 @@ import swaydb.core.RunThis._ import swaydb.core.TestData._ import swaydb.core.actor.{FileSweeper, MemorySweeper} import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.BlockCache import swaydb.core.io.reader.Reader import swaydb.core.{TestBase, TestSweeper} @@ -43,7 +44,7 @@ class SegmentSerialiserSpec extends TestBase { implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long implicit val blockCache: Option[BlockCache.State] = TestSweeper.randomBlockCache diff --git a/core/src/test/scala/swaydb/core/segment/format/a/SegmentWriteSpec.scala b/core/src/test/scala/swaydb/core/segment/format/a/SegmentWriteSpec.scala index 3ce1f0baf..e3454053c 100644 --- a/core/src/test/scala/swaydb/core/segment/format/a/SegmentWriteSpec.scala +++ b/core/src/test/scala/swaydb/core/segment/format/a/SegmentWriteSpec.scala @@ -36,6 +36,7 @@ import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper} import swaydb.core.data.Value.FromValue import swaydb.core.data._ import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.io.file.Effect._ import swaydb.core.io.file.{BlockCache, Effect} import swaydb.core.level.PathsDistributor @@ -97,7 +98,7 @@ sealed trait SegmentWriteSpec extends TestBase { // override def deleteFiles = false - implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper + implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner "Segment" should { @@ -551,7 +552,7 @@ sealed trait SegmentWriteSpec extends TestBase { runThis(10.times) { // implicit val fileSweeper = FileSweeper.Disabled implicit val blockCache: Option[BlockCache.State] = None - implicit val fileSweeper = FileSweeper(50, ActorConfig.TimeLoop("", 10.seconds, TestExecutionContext.executionContext)) + implicit val fileSweeper = FileSweeper(50, ActorConfig.TimeLoop("", 10.seconds, TestExecutionContext.executionContext)).actor val keyValues = randomizedKeyValues(keyValuesCount) val segment = TestSegment(keyValues) @@ -659,7 +660,7 @@ sealed trait SegmentWriteSpec extends TestBase { } else { runThis(5.times, log = true) { implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.memorySweeperMax - implicit val segmentOpenLimit = FileSweeper(1, ActorConfig.TimeLoop("", 2.second, ec)) + implicit val fileSweeper = FileSweeper(1, ActorConfig.TimeLoop("", 2.second, ec)).actor val keyValues = randomizedKeyValues(keyValuesCount) @@ -669,14 +670,14 @@ sealed trait SegmentWriteSpec extends TestBase { TestSegment( keyValues = keyValues, segmentConfig = segmentConfig - )(keyOrder, memorySweeper, segmentOpenLimit) + )(keyOrder, memorySweeper, fileSweeper) segment1.isOpen shouldBe !segmentConfig.cacheBlocksOnCreate segment1.getKeyValueCount() shouldBe keyValues.size segment1.isOpen shouldBe !segmentConfig.cacheBlocksOnCreate //create another segment should close segment 1 - val segment2 = TestSegment(keyValues)(keyOrder, memorySweeper, segmentOpenLimit) + val segment2 = TestSegment(keyValues)(keyOrder, memorySweeper, fileSweeper) segment2.getKeyValueCount() shouldBe keyValues.size eventual(5.seconds) { @@ -699,7 +700,7 @@ sealed trait SegmentWriteSpec extends TestBase { segment1.close segment2.close - segmentOpenLimit.terminateAndRecover[Future](10.seconds).await(10.seconds) + fileSweeper.terminateAndRecover[Future](10.seconds).await(10.seconds) } } } diff --git a/data/src/main/scala/swaydb/Actor.scala b/data/src/main/scala/swaydb/Actor.scala index ad9b1d5ac..6def5ec45 100644 --- a/data/src/main/scala/swaydb/Actor.scala +++ b/data/src/main/scala/swaydb/Actor.scala @@ -101,6 +101,28 @@ object Actor { case object TerminatedActor extends Actor.Error } + def deadActor[T, S](): ActorRef[T, S] = + new ActorRef[T, S] { + override def name: String = "Dead actor" + override def executionContext: ExecutionContext = throw new Exception("Dead Actor") + override def send(message: T): Unit = throw new Exception("Dead Actor") + override def ask[R, X[_]](message: ActorRef[R, Unit] => T)(implicit bag: Bag.Async[X]): X[R] = throw new Exception("Dead Actor") + override def send(message: T, delay: FiniteDuration)(implicit scheduler: Scheduler): TimerTask = throw new Exception("Dead Actor") + override def ask[R, X[_]](message: ActorRef[R, Unit] => T, delay: FiniteDuration)(implicit scheduler: Scheduler, bag: Bag.Async[X]): Task[R, X] = throw new Exception("Dead Actor") + override def totalWeight: Int = throw new Exception("Dead Actor") + override def messageCount: Int = throw new Exception("Dead Actor") + override def isEmpty: Boolean = throw new Exception("Dead Actor") + override def recover[M <: T, E: ExceptionHandler](f: (M, IO[E, Error], Actor[T, S]) => Unit): ActorRef[T, S] = throw new Exception("Dead Actor") + override def receiveAllForce[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit scheduler: Scheduler, bag: Bag.Async[BAG]): BAG[Unit] = throw new Exception("Dead Actor") + override def receiveAllBlocking(retryCounts: Int): Try[Unit] = throw new Exception("Dead Actor") + override def terminate(): Unit = throw new Exception("Dead Actor") + override def terminateAfter(timeout: FiniteDuration)(implicit scheduler: Scheduler): ActorRef[T, S] = throw new Exception("Dead Actor") + override def isTerminated: Boolean = throw new Exception("Dead Actor") + override def clear(): Unit = throw new Exception("Dead Actor") + override def terminateAndClear(): Unit = throw new Exception("Dead Actor") + override def terminateAndRecover[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit scheduler: Scheduler, bag: Bag.Async[BAG]): BAG[Unit] = throw new Exception("Dead Actor") + } + def cacheFromConfig[T](config: ActorConfig, stashCapacity: Int, weigher: T => Int)(execution: (T, Actor[T, Unit]) => Unit): ActorRef[T, Unit] = diff --git a/swaydb/src/main/scala/swaydb/SwayDB.scala b/swaydb/src/main/scala/swaydb/SwayDB.scala index 593b72323..61a1d2987 100644 --- a/swaydb/src/main/scala/swaydb/SwayDB.scala +++ b/swaydb/src/main/scala/swaydb/SwayDB.scala @@ -31,6 +31,7 @@ import com.typesafe.scalalogging.LazyLogging import swaydb.configs.level.SingleThreadFactory import swaydb.core.Core import swaydb.core.actor.FileSweeper +import swaydb.core.actor.FileSweeper.FileSweeperActor import swaydb.core.data._ import swaydb.core.function.FunctionStore import swaydb.core.level.tool.AppendixRepairer @@ -218,7 +219,7 @@ object SwayDB extends LazyLogging { */ def repairAppendix[K](levelPath: Path, repairStrategy: AppendixRepairStrategy)(implicit serializer: Serializer[K], - fileSweeper: FileSweeper.Enabled, + fileSweeper: FileSweeperActor, keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default): IO[swaydb.Error.Level, RepairResult[K]] = //convert to typed result. AppendixRepairer(levelPath, repairStrategy) match {