Skip to content

Commit

Permalink
FileSweeper improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
simerplaha committed Aug 13, 2020
1 parent cc497ea commit 77a1f33
Show file tree
Hide file tree
Showing 51 changed files with 240 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import swaydb.core.CommonAssertions.randomIOStrategy
import swaydb.core.TestData._
import swaydb.core.actor.FileSweeper
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.segment.format.a.block.reader.BlockRefReader
import swaydb.core.util.{Benchmark, BlockCacheFileIDGenerator, Bytes}
import swaydb.core.{TestBase, TestSweeper}
Expand All @@ -45,7 +46,7 @@ import scala.util.Random

class DBFileWriteReadPerformanceSpec extends TestBase {

implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper
implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper
implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner
implicit val memorySweeper = TestSweeper.memorySweeperMax

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//
// implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default
// implicit def testTimer: TestTimer = TestTimer.random
// implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper
// implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper
// implicit val memorySweeper = TestSweeper.memorySweeperMax
// implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package swaydb.core.segment.format.a

import swaydb.core.TestData._
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.actor.{FileSweeper, MemorySweeper}
import swaydb.core.data.Memory
import swaydb.core.io.file.BlockCache
Expand Down Expand Up @@ -75,7 +76,7 @@ sealed trait SegmentReadPerformanceSpec extends TestBase {
val keyValuesCount = 1000000
// override def deleteFiles = false

implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeper.Enabled = TestSweeper.fileSweeper
implicit val maxOpenSegmentsCacheImplicitLimiter: FileSweeperActor = TestSweeper.fileSweeper
// implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.someMemorySweeperMax
// implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = TestSweeper.someMemorySweeper10
implicit val keyValueMemorySweeper: Option[MemorySweeper.KeyValue] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ package swaydb.core.segment.format.a.block

import swaydb.IOValues._
import swaydb.core.TestData._
import swaydb.core.actor.FileSweeper
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.io.file.{BlockCache, DBFile}
import swaydb.core.io.reader.FileReader
import swaydb.core.segment.format.a.block.reader.{BlockReader, BlockRefReader}
Expand All @@ -39,7 +39,7 @@ import swaydb.data.util.StorageUnits._

class BlockReaderPerformanceSpec extends TestBase {

implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper
implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper
implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner
implicit val memorySweeper = TestSweeper.memorySweeperMax

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import swaydb.core.TestData._
import swaydb.core.TestSweeper.fileSweeper
import swaydb.core.actor.FileSweeper
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.util.{Benchmark, BlockCacheFileIDGenerator}
import swaydb.core.{TestBase, TestSweeper}
import swaydb.data.util.OperatingSystem
Expand All @@ -40,7 +41,7 @@ import scala.concurrent.duration._

class DBFileStressWriteSpec extends TestBase {

implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper
implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper
implicit val bufferCleaner: ByteBufferSweeperActor = TestSweeper.bufferCleaner
implicit val memorySweeper = TestSweeper.memorySweeperMax

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
// implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default
//
// implicit val skipListMerger = LevelZeroSkipListMerger
// implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper
// implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper
// implicit val memorySweeper = TestSweeper.memorySweeperMax
// implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
// implicit val keyOrder: KeyOrder[Slice[Byte]] = KeyOrder.default
// implicit val timeOrder: TimeOrder[Slice[Byte]] = TimeOrder.long
// implicit def testTimer: TestTimer = TestTimer.Empty
// implicit val fileSweeper: FileSweeper.Enabled = TestSweeper.fileSweeper
// implicit val fileSweeper: FileSweeperActor = TestSweeper.fileSweeper
// implicit val memorySweeper = TestSweeper.memorySweeperMax
//
// import swaydb.core.map.serializer.LevelZeroMapEntryReader._
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/swaydb/core/CoreInitializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import swaydb.data.config._
import swaydb.data.order.{KeyOrder, TimeOrder}
import swaydb.data.slice.Slice
import swaydb.data.storage.{AppendixStorage, LevelStorage}
import swaydb.{ActorWire, Bag, Error, IO, Scheduler}
import swaydb.{ActorRef, ActorWire, Bag, Error, IO, Scheduler}

import scala.concurrent.Await
import scala.concurrent.duration._
Expand Down Expand Up @@ -124,7 +124,7 @@ private[core] object CoreInitializer extends LazyLogging {
timeOrder: TimeOrder[Slice[Byte]],
functionStore: FunctionStore): IO[swaydb.Error.Boot, Core[Bag.Less]] = {

implicit val fileSweeper: FileSweeper.Enabled =
implicit val fileSweeper: ActorRef[FileSweeper.Command, Unit] =
FileSweeper(fileCache)

val memorySweeper: Option[MemorySweeper.Enabled] =
Expand Down Expand Up @@ -152,7 +152,7 @@ private[core] object CoreInitializer extends LazyLogging {
ThrottleCompactor

implicit val bufferCleaner: ByteBufferSweeperActor =
ByteBufferSweeper()(Scheduler()(fileSweeper.ec))
ByteBufferSweeper()(Scheduler()(fileSweeper.executionContext))

def createLevel(id: Long,
nextLevel: Option[NextLevel],
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/swaydb/core/CoreShutdown.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@
package swaydb.core

import com.typesafe.scalalogging.LazyLogging
import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper}
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper}
import swaydb.core.io.file.BlockCache
import swaydb.core.level.compaction.Compactor
import swaydb.core.level.compaction.throttle.ThrottleState
Expand All @@ -48,7 +49,7 @@ private[core] object CoreShutdown extends LazyLogging {

def shutdown(zero: LevelZero,
retryOnBusyDelay: FiniteDuration)(implicit compactor: ActorWire[Compactor[ThrottleState], ThrottleState],
fileSweeper: Option[FileSweeper.Enabled],
fileSweeper: Option[FileSweeperActor],
blockCache: Option[BlockCache.State],
keyValueMemorySweeper: Option[MemorySweeper.KeyValue],
scheduler: Scheduler,
Expand Down Expand Up @@ -111,7 +112,7 @@ private[core] object CoreShutdown extends LazyLogging {
_ =>
fileSweeper match {
case Some(fileSweeper) =>
logger.info(s"Terminating ${classOf[FileSweeper].getSimpleName}Actor.")
logger.info(s"Terminating FileSweeperActor.")
fileSweeper.terminateAndRecover(retryOnBusyDelay)

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[core] object ByteBufferSweeper extends LazyLogging {

implicit class ByteBufferSweeperActorImplicits(cache: ByteBufferSweeperActor) {
@inline def actor: ActorRef[Command, State] =
cache.value(())
this.cache.value(())
}

/**
Expand Down
139 changes: 51 additions & 88 deletions core/src/main/scala/swaydb/core/actor/FileSweeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ package swaydb.core.actor
import java.nio.file.Path

import com.typesafe.scalalogging.LazyLogging
import swaydb.core.cache.Lazy
import swaydb.core.actor.ByteBufferSweeper.{ByteBufferSweeperActor, State}
import swaydb.core.cache.{Cache, CacheNoIO}
import swaydb.data.config.{ActorConfig, FileCache}
import swaydb.{Actor, ActorRef, Bag, IO, Scheduler}
import swaydb.{Actor, ActorRef, IO}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.ref.WeakReference

private[core] trait FileSweeperItem {
Expand All @@ -41,76 +40,80 @@ private[core] trait FileSweeperItem {
def isOpen: Boolean
}

private[swaydb] trait FileSweeper {
def close(file: FileSweeperItem): Unit
}
/**
* Actor that manages closing and delete files that are overdue.
*/
private[swaydb] object FileSweeper extends LazyLogging {

/**
* Disables File management. This is generally enabled for in-memory databases
* where closing or deleting files is not really required since GC cleans up these
* files.
*/
case object Disabled extends FileSweeper {
def close(file: FileSweeperItem): Unit = ()
}
type FileSweeperActor = ActorRef[FileSweeper.Command, Unit]

/**
* Enables file management.
*/
sealed trait Enabled extends FileSweeper {
def ec: ExecutionContext
def close(file: FileSweeperItem): Unit
def delete(file: FileSweeperItem): Unit
def messageCount(): Int
def terminateAndRecover[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit bag: Bag.Async[BAG],
scheduler: Scheduler): BAG[Unit]
implicit class FileSweeperActorActorImplicits(cache: CacheNoIO[Unit, FileSweeperActor]) {
@inline def actor =
this.cache.value(())
}

private sealed trait Action {
sealed trait Command {
def isDelete: Boolean
}

private object Action {
case class Delete(file: FileSweeperItem) extends Action {
object Command {
//Delete cannot be a WeakReference because Levels can
//remove references to the file after eventualDelete is invoked.
//If the file gets garbage collected due to it being WeakReference before
//delete on the file is triggered, the physical file will remain on disk.
case class Delete(file: FileSweeperItem) extends Command {
def isDelete: Boolean = true
}
case class Close(file: WeakReference[FileSweeperItem]) extends Action {

object Close {
def apply(file: FileSweeperItem): Close =
new Close(new WeakReference[FileSweeperItem](file))
}
case class Close private(file: WeakReference[FileSweeperItem]) extends Command {
def isDelete: Boolean = false
}
}

def weigher(action: Action) =
if (action.isDelete) 10 else 1
def weigher(command: Command): Int =
if (command.isDelete) 10 else 1

def apply(fileCache: FileCache): Option[FileSweeper.Enabled] =
def apply(fileCache: FileCache): Option[FileSweeperActor] =
fileCache match {
case FileCache.Disable =>
None

case enable: FileCache.Enable =>
Some(apply(enable))
}

def apply(fileCache: FileCache.Enable): FileSweeper.Enabled =

def apply(fileCache: FileCache.Enable): FileSweeperActor =
apply(
maxOpenSegments = fileCache.maxOpen,
actorConfig = fileCache.actorConfig
)
).value(())

def apply(maxOpenSegments: Int,
actorConfig: ActorConfig): CacheNoIO[Unit, FileSweeperActor] =
Cache.noIO[Unit, ActorRef[Command, Unit]](synchronised = true, stored = true, initial = None) {
(_, _) =>
createActor(
maxOpenSegments = maxOpenSegments,
actorConfig = actorConfig
)
}

private def processAction(action: Action): Unit =
action match {
case Action.Delete(file) =>
private def processCommand(command: Command): Unit =
command match {
case Command.Delete(file) =>
try
file.delete()
catch {
case exception: Exception =>
logger.error(s"Failed to delete file. ${file.path}", exception)
}

case Action.Close(file) =>
case Command.Close(file) =>
file.get foreach {
file =>
try
Expand All @@ -122,68 +125,28 @@ private[swaydb] object FileSweeper extends LazyLogging {
}
}

private def createActor(maxOpenSegments: Int, actorConfig: ActorConfig): ActorRef[Action, Unit] =
Actor.cacheFromConfig[Action](
private def createActor(maxOpenSegments: Int, actorConfig: ActorConfig): ActorRef[Command, Unit] =
Actor.cacheFromConfig[Command](
config = actorConfig,
stashCapacity = maxOpenSegments,
weigher = FileSweeper.weigher
) {
case (action, _) =>
processAction(action)
} recoverException[Action] {
case (action, io, _) =>
case (command, _) =>
processCommand(command)
} recoverException[Command] {
case (command, io, _) =>
io match {
case IO.Right(Actor.Error.TerminatedActor) =>
processAction(action)
processCommand(command)

case IO.Left(exception) =>
action match {
case Action.Delete(file) =>
command match {
case Command.Delete(file) =>
logger.error(s"Failed to delete file. Path = ${file.path}.", exception)

case Action.Close(file) =>
case Command.Close(file) =>
logger.error(s"Failed to close file. WeakReference(path: Path) = ${file.get.map(_.path)}.", exception)
}
}
}

def apply(maxOpenSegments: Int, actorConfig: ActorConfig): FileSweeper.Enabled = {
val lazyActor =
Lazy.value[ActorRef[Action, Unit]](
synchronised = true,
stored = true,
initial = None
)

def actor(): ActorRef[Action, Unit] =
lazyActor.getOrSet(createActor(maxOpenSegments, actorConfig))

new FileSweeper.Enabled {

override def ec = actorConfig.ec

override def close(file: FileSweeperItem): Unit =
actor() send Action.Close(new WeakReference[FileSweeperItem](file))

//Delete cannot be a WeakReference because Levels can
//remove references to the file after eventualDelete is invoked.
//If the file gets garbage collected due to it being WeakReference before
//delete on the file is triggered, the physical file will remain on disk.
override def delete(file: FileSweeperItem): Unit =
actor() send Action.Delete(file)

override def messageCount(): Int =
if (lazyActor.isDefined)
actor().messageCount
else
0

override def terminateAndRecover[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit bag: Bag.Async[BAG],
scheduler: Scheduler): BAG[Unit] =
if (lazyActor.isDefined)
actor().terminateAndRecover(retryOnBusyDelay)
else
bag.unit
}
}
}
Loading

0 comments on commit 77a1f33

Please sign in to comment.