Skip to content

Commit

Permalink
fixes #242 and added TestCaseSweeper to automatically delete files
Browse files Browse the repository at this point in the history
  • Loading branch information
simerplaha committed Aug 14, 2020
1 parent 77a1f33 commit 4d989bf
Show file tree
Hide file tree
Showing 54 changed files with 3,196 additions and 2,380 deletions.
19 changes: 13 additions & 6 deletions core/src/main/scala/swaydb/core/Core.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import swaydb.data.compaction.LevelMeter
import swaydb.data.config._
import swaydb.data.order.{KeyOrder, TimeOrder}
import swaydb.data.slice.{Slice, SliceOption}
import swaydb.data.util.Futures.FutureImplicits
import swaydb.data.util.TupleOrNone
import swaydb.{Bag, IO, OK, Prepare}

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -147,7 +149,8 @@ private[swaydb] object Core {

private[swaydb] class Core[BAG[_]](val zero: LevelZero,
threadStateCache: ThreadStateCache,
onClose: => IO.Defer[swaydb.Error.Close, Unit])(implicit bag: Bag[BAG]) {
onClose: FiniteDuration => Future[Unit])(implicit bag: Bag[BAG],
shutdownExecutionContext: ExecutionContext) {

private val serial = bag.createSerial()

Expand Down Expand Up @@ -308,11 +311,15 @@ private[swaydb] class Core[BAG[_]](val zero: LevelZero,
def levelMeter(levelNumber: Int): Option[LevelMeter] =
zero.meterFor(levelNumber)

def close(): BAG[Unit] =
onClose.run(0)
def close(retryInterval: FiniteDuration): BAG[Unit] =
IO.fromFuture(onClose(retryInterval)).run(0)

def delete(): BAG[Unit] =
onClose.flatMapIO(_ => zero.delete).run(0)
def deleteFuture(retryInterval: FiniteDuration): Future[Unit] =
onClose(retryInterval)
.and(zero.delete(retryInterval))

def delete(retryInterval: FiniteDuration): BAG[Unit] =
IO.fromFuture(deleteFuture(retryInterval)).run(0)

def clear(readState: ThreadReadState): BAG[OK] =
zero.run(_.clear(readState))
Expand All @@ -322,5 +329,5 @@ private[swaydb] class Core[BAG[_]](val zero: LevelZero,
zero = zero,
threadStateCache = threadStateCache,
onClose = onClose
)(bag)
)(bag, shutdownExecutionContext)
}
16 changes: 8 additions & 8 deletions core/src/main/scala/swaydb/core/CoreInitializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.file.Paths

import com.typesafe.scalalogging.LazyLogging
import swaydb.Error.Level.ExceptionHandler
import swaydb.core.CoreShutdown.shutdown
import swaydb.core.CoreShutdown.close
import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper}
import swaydb.core.function.FunctionStore
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
Expand All @@ -50,7 +50,7 @@ import swaydb.data.slice.Slice
import swaydb.data.storage.{AppendixStorage, LevelStorage}
import swaydb.{ActorRef, ActorWire, Bag, Error, IO, Scheduler}

import scala.concurrent.Await
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

/**
Expand Down Expand Up @@ -240,17 +240,17 @@ private[core] object CoreInitializer extends LazyLogging {

sys.addShutdownHook {
implicit val scheduler = Scheduler()
Await.result(shutdown(zero, shutdownTimeout), shutdownTimeout)
implicit val bag = Bag.future
Await.result(IO.Defer(close(zero, shutdownTimeout)).run(0), shutdownTimeout)
}

//trigger initial wakeUp.
sendInitialWakeUp(compactor)

def onClose =
IO.fromFuture[swaydb.Error.Close, Unit] {
implicit val scheduler = Scheduler()
CoreShutdown.shutdown(zero, shutdownTimeout)
}
def onClose(retryInterval: FiniteDuration): Future[Unit] = {
implicit val scheduler = Scheduler()
CoreShutdown.close(zero, retryInterval)
}

new Core[Bag.Less](
zero = zero,
Expand Down
123 changes: 14 additions & 109 deletions core/src/main/scala/swaydb/core/CoreShutdown.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@
package swaydb.core

import com.typesafe.scalalogging.LazyLogging
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper}
import swaydb.core.io.file.BlockCache
import swaydb.core.level.compaction.Compactor
import swaydb.core.level.compaction.throttle.ThrottleState
import swaydb.core.level.zero.LevelZero
import swaydb.data.util.Futures
import swaydb.data.util.Futures.FutureImplicits
import swaydb.{ActorWire, Bag, Scheduler}

import scala.concurrent.Future
Expand All @@ -47,117 +43,26 @@ private[core] object CoreShutdown extends LazyLogging {
* - Flushes all files and persists them to disk for persistent databases.
*/

def shutdown(zero: LevelZero,
retryOnBusyDelay: FiniteDuration)(implicit compactor: ActorWire[Compactor[ThrottleState], ThrottleState],
fileSweeper: Option[FileSweeperActor],
blockCache: Option[BlockCache.State],
keyValueMemorySweeper: Option[MemorySweeper.KeyValue],
scheduler: Scheduler,
cleaner: ByteBufferSweeperActor): Future[Unit] = {
def close(zero: LevelZero,
retryInterval: FiniteDuration)(implicit compactor: ActorWire[Compactor[ThrottleState], ThrottleState],
scheduler: Scheduler) = {
implicit val ec = scheduler.ec
implicit val futureBag = Bag.future(scheduler.ec)

logger.info("****** Shutting down ******")

logger.info("Stopping compaction!")
val compactionShutdown =
compactor
.ask
.flatMap {
(impl, state, self) =>
impl.terminate(state, self)
}
.recoverWith {
case exception =>
logger.error("Failed compaction shutdown.", exception)
Future.failed(exception)
}

val levelsShutdown =
compactionShutdown flatMap {
_ =>
logger.info("Closing files!")
zero.close onLeftSideEffect {
error =>
logger.error("Failed to close files.", error.exception)
} toFuture
}

val blockCacheShutdown =
levelsShutdown flatMap {
_ =>
logger.info("Clearing blockCache.")
blockCache match {
case Some(value) =>
Future.successful(value.clear())

case None =>
Futures.unit
}
}

val keyValueSweeperShutdown =
blockCacheShutdown flatMap {
_ =>
keyValueMemorySweeper.foreach {
sweeper =>
logger.info("Clearing cached key-values")
sweeper.terminateAndClear()
}
Futures.unit
}

val fileSweeperShutdown =
keyValueSweeperShutdown flatMap {
_ =>
fileSweeper match {
case Some(fileSweeper) =>
logger.info(s"Terminating FileSweeperActor.")
fileSweeper.terminateAndRecover(retryOnBusyDelay)

case None =>
Futures.unit
}
compactor
.ask
.flatMap {
(impl, state, self) =>
impl.terminate(state, self)
}

val bufferCleanerResult =
fileSweeperShutdown flatMap {
_ =>
logger.info(s"Terminating ByteBufferCleanerActor.")
cleaner.get() match {
case Some(actor) =>
actor.terminateAndRecover(retryOnBusyDelay) flatMap {
_ =>
logger.info(s"Terminated ByteBufferCleanerActor. Awaiting shutdown response.")
actor ask ByteBufferSweeper.Command.IsTerminatedAndCleaned[Unit]
} flatMap {
isShut =>
if (isShut)
Futures.`true`
else
Futures.`false`
}

case None =>
Futures.unit
}
.recoverWith {
case exception =>
logger.error("Failed compaction shutdown.", exception)
Future.failed(exception)
}

val releaseLocks =
bufferCleanerResult flatMap {
_ =>
logger.info("Releasing locks.")
zero.releaseLocks onLeftSideEffect {
error =>
logger.error("Failed to release locks.", error.exception)
} toFuture
}

releaseLocks flatMap {
_ =>
logger.info("Terminating Scheduler.")
scheduler.terminate()
Futures.unit
}
.and(zero.close(retryInterval))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,5 @@ private[core] object ByteBufferCleaner extends LazyLogging {
val errorMessage = s"ByteBuffer cleaner not initialised. Failed to clean MMAP file: ${bufferPath.toString}."
val exception = error.exception
logger.error(errorMessage, exception)
throw new Exception(errorMessage, exception) //also throw to output to stdout in-case logging is not enabled since this is critical.
}
}
56 changes: 55 additions & 1 deletion core/src/main/scala/swaydb/core/actor/ByteBufferSweeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import swaydb.core.io.file.Effect
import swaydb.core.util.Counter
import swaydb.core.util.FiniteDurations._
import swaydb.data.config.ActorConfig.QueueOrder
import swaydb.data.util.Futures

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration._


Expand Down Expand Up @@ -125,6 +127,59 @@ private[core] object ByteBufferSweeper extends LazyLogging {
case class State(var cleaner: Option[Cleaner],
pendingClean: mutable.HashMap[Path, Counter.Request[Command.Clean]])

def closeAsync[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit sweeper: ByteBufferSweeperActor,
bag: Bag.Async[BAG],
scheduler: Scheduler): BAG[Unit] =
sweeper.get() match {
case Some(actor) =>
bag.flatMap(actor.terminateAndRecoverAsync(retryOnBusyDelay)) {
_ =>
val ask = actor ask Command.IsTerminatedAndCleaned(resubmitted = false)
bag.transform(ask) {
isCleaned =>
if (isCleaned)
logger.info(s"${ByteBufferSweeper.getClass.getSimpleName.split("\\$").last} terminated!")
else
logger.error(s"Failed to terminate ${ByteBufferSweeper.getClass.getSimpleName.split("\\$").last}")
}
}

case None =>
bag.unit
}

/**
* Closes the [[ByteBufferSweeperActor]] synchronously. This simply calls the Actor function [[Actor.terminateAndRecoverSync]]
* and dispatches [[Command.IsTerminatedAndCleaned]] to assert that all files are closed and flushed.
*
* @param retryBlock
* @param timeout
* @param sweeper
* @param bag
* @param ec
* @tparam BAG
* @return
*/
def closeSync[BAG[_]](retryBlock: FiniteDuration,
timeout: FiniteDuration)(implicit sweeper: ByteBufferSweeperActor,
bag: Bag.Sync[BAG],
ec: ExecutionContext): BAG[Unit] =
sweeper.get() match {
case Some(actor) =>
bag.map(actor.terminateAndRecoverSync(retryBlock)) {
_ =>
val future = actor ask Command.IsTerminatedAndCleaned(resubmitted = false)
val isCleaned = Await.result(future, timeout)
if (isCleaned)
logger.info(s"${ByteBufferSweeper.getClass.getSimpleName.split("\\$").last} terminated!")
else
logger.error(s"Failed to terminate ${ByteBufferSweeper.getClass.getSimpleName.split("\\$").last}")
}

case None =>
bag.unit
}

/**
* Maintains the count of all delete request for each memory-mapped file.
*/
Expand Down Expand Up @@ -182,7 +237,6 @@ private[core] object ByteBufferSweeper extends LazyLogging {
val errorMessage = s"Failed to clean MappedByteBuffer at path '${path.toString}'."
val exception = error.exception
logger.error(errorMessage, exception)
throw IO.throwable(errorMessage, exception) //also throw to output to stdout in-case logging is not enabled since this is critical.
}

case None =>
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/swaydb/core/actor/FileSweeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ package swaydb.core.actor
import java.nio.file.Path

import com.typesafe.scalalogging.LazyLogging
import swaydb.core.actor.ByteBufferSweeper.{ByteBufferSweeperActor, State}
import swaydb.core.cache.{Cache, CacheNoIO}
import swaydb.data.config.{ActorConfig, FileCache}
import swaydb.{Actor, ActorRef, IO}
import swaydb.{Actor, ActorRef, Bag, IO, Scheduler}

import scala.concurrent.duration.FiniteDuration
import scala.ref.WeakReference

private[core] trait FileSweeperItem {
Expand Down Expand Up @@ -103,6 +103,21 @@ private[swaydb] object FileSweeper extends LazyLogging {
)
}

def closeAsync[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit fileSweeper: FileSweeperActor,
bag: Bag.Async[BAG],
scheduler: Scheduler): BAG[Unit] =
bag.transform(fileSweeper.terminateAndRecoverAsync(retryOnBusyDelay)) {
_ =>
logger.info(this.getClass.getSimpleName + " terminated!")
}

def closeSync[BAG[_]](retryOnBusyDelay: FiniteDuration)(implicit fileSweeper: FileSweeperActor,
bag: Bag.Sync[BAG]): BAG[Unit] =
bag.transform(fileSweeper.terminateAndRecoverSync(retryOnBusyDelay)) {
_ =>
logger.info(this.getClass.getSimpleName + " terminated!")
}

private def processCommand(command: Command): Unit =
command match {
case Command.Delete(file) =>
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/swaydb/core/actor/MemorySweeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package swaydb.core.actor

import com.typesafe.scalalogging.LazyLogging
import swaydb.core.data.{KeyValue, Persistent}
import swaydb.core.io.file.BlockCache
import swaydb.core.util.HashedMap
Expand Down Expand Up @@ -61,7 +62,7 @@ private[core] sealed trait MemorySweeper
* Cleared all cached data. [[MemorySweeper]] is not required for Memory only databases
* and zero databases.
*/
private[core] object MemorySweeper {
private[core] object MemorySweeper extends LazyLogging {

def apply(memoryCache: MemoryCache): Option[MemorySweeper.Enabled] =
memoryCache match {
Expand Down Expand Up @@ -100,6 +101,16 @@ private[core] object MemorySweeper {
)
}

def close(keyValueMemorySweeper: Option[MemorySweeper.KeyValue]): Unit =
keyValueMemorySweeper.foreach(close)

def close(sweeper: MemorySweeper.KeyValue): Unit =
sweeper.actor foreach {
actor =>
logger.info("Clearing cached key-values")
actor.terminateAndClear()
}

def weigher(entry: Command): Int =
entry match {
case command: Command.BlockCache =>
Expand Down
Loading

0 comments on commit 4d989bf

Please sign in to comment.