Skip to content

Commit

Permalink
#251 - implicit ForceSaveApplier
Browse files Browse the repository at this point in the history
  • Loading branch information
simerplaha committed Aug 27, 2020
1 parent b80f4e0 commit 5fa4ac1
Show file tree
Hide file tree
Showing 26 changed files with 273 additions and 99 deletions.
8 changes: 7 additions & 1 deletion core/src/main/scala/swaydb/core/CoreInitializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import swaydb.Error.Level.ExceptionHandler
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.{ByteBufferSweeper, FileSweeper, MemorySweeper}
import swaydb.core.function.FunctionStore
import swaydb.core.io.file.BlockCache
import swaydb.core.io.file.{BlockCache, ForceSaveApplier}
import swaydb.core.io.file.Effect._
import swaydb.core.level.compaction._
import swaydb.core.level.compaction.throttle.{ThrottleCompactor, ThrottleState}
Expand Down Expand Up @@ -181,6 +181,8 @@ private[core] object CoreInitializer extends LazyLogging {
config: LevelConfig): IO[swaydb.Error.Level, NextLevel] =
config match {
case config: MemoryLevelConfig =>
implicit val forceSaveApplier: ForceSaveApplier = ForceSaveApplier.Disabled

Level(
bloomFilterConfig = BloomFilterBlock.Config.disabled,
hashIndexConfig = block.hashindex.HashIndexBlock.Config.disabled,
Expand All @@ -206,6 +208,8 @@ private[core] object CoreInitializer extends LazyLogging {
)

case config: PersistentLevelConfig =>
implicit val forceSaveApplier: ForceSaveApplier = ForceSaveApplier.DefaultApplier

Level(
bloomFilterConfig = BloomFilterBlock.Config(config = config.mightContainKeyIndex),
hashIndexConfig = block.hashindex.HashIndexBlock.Config(config = config.randomKeyIndex),
Expand All @@ -231,6 +235,8 @@ private[core] object CoreInitializer extends LazyLogging {
previousLowerLevel: Option[NextLevel]): IO[swaydb.Error.Level, Core[Bag.Less]] =
levelConfigs match {
case Nil =>
implicit val forceSaveApplier: ForceSaveApplier = ForceSaveApplier.DefaultApplier

createLevel(
id = 1,
nextLevel = previousLowerLevel,
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/swaydb/core/actor/ByteBufferSweeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.typesafe.scalalogging.LazyLogging
import swaydb.Error.IO.ExceptionHandler
import swaydb._
import swaydb.core.actor.ByteBufferCleaner.Cleaner
import swaydb.core.io.file.Effect
import swaydb.core.io.file.{Effect, ForceSaveApplier}
import swaydb.data.cache.{Cache, CacheNoIO}
import swaydb.data.config.ActorConfig.QueueOrder
import swaydb.data.config.ForceSave
Expand Down Expand Up @@ -74,14 +74,15 @@ private[core] object ByteBufferSweeper extends LazyLogging {
hasReference: () => Boolean,
forced: AtomicBoolean,
filePath: Path,
forceSave: ForceSave.MMAPFiles): Clean =
forceSave: ForceSave.MMAPFiles)(implicit forceSaveApplier: ForceSaveApplier): Clean =
new Clean(
buffer = buffer,
filePath = filePath,
isRecorded = false,
hasReference = hasReference,
forced = forced,
forceSave = forceSave,
forceSaveApplier = forceSaveApplier,
//this id is being used instead of HashCode because nio.FileChannel returns
//the same MappedByteBuffer even after the FileChannel is closed.
id = idGenerator.incrementAndGet()
Expand All @@ -103,6 +104,7 @@ private[core] object ByteBufferSweeper extends LazyLogging {
hasReference: () => Boolean,
forced: AtomicBoolean,
forceSave: ForceSave.MMAPFiles,
forceSaveApplier: ForceSaveApplier,
id: Long) extends FileCommand {
override def name: String = s"Clean: $filePath"
}
Expand Down Expand Up @@ -273,6 +275,8 @@ private[core] object ByteBufferSweeper extends LazyLogging {
def initCleanerAndPerformClean(state: State,
buffer: MappedByteBuffer,
command: Command.Clean): IO[swaydb.Error.IO, State] = {
implicit val applier = command.forceSaveApplier

state.cleaner match {
case Some(cleaner) =>
IO {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/swaydb/core/io/file/ChannelFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import swaydb.data.slice.Slice
private[file] object ChannelFile {
def write(path: Path,
blockCacheFileId: Long,
forceSave: ForceSave.ChannelFiles): ChannelFile = {
forceSave: ForceSave.ChannelFiles)(implicit forceSaveApplier: ForceSaveApplier): ChannelFile = {
val channel = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
new ChannelFile(
path = path,
Expand All @@ -48,7 +48,7 @@ private[file] object ChannelFile {
}

def read(path: Path,
blockCacheFileId: Long): ChannelFile =
blockCacheFileId: Long)(implicit forceSaveApplier: ForceSaveApplier): ChannelFile =
if (Effect.exists(path)) {
val channel = FileChannel.open(path, StandardOpenOption.READ)
new ChannelFile(
Expand Down
21 changes: 14 additions & 7 deletions core/src/main/scala/swaydb/core/io/file/DBFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object DBFile extends LazyLogging {
file: Option[DBFileType],
blockCacheFileId: Long,
autoClose: Boolean)(implicit fileSweeper: FileSweeperActor,
bufferCleaner: ByteBufferSweeperActor) = {
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier) = {

//We need to create a single FileSweeperItem that can be
//re-submitted to fileSweeper every time this file requires a close Actor request.
Expand Down Expand Up @@ -126,7 +127,8 @@ object DBFile extends LazyLogging {
autoClose: Boolean,
forceSave: ForceSave.ChannelFiles)(implicit fileSweeper: FileSweeperActor,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor): DBFile = {
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile = {
val file = ChannelFile.write(path, blockCacheFileId, forceSave)
new DBFile(
path = path,
Expand Down Expand Up @@ -154,7 +156,8 @@ object DBFile extends LazyLogging {
blockCacheFileId: Long,
checkExists: Boolean = true)(implicit fileSweeper: FileSweeperActor,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor): DBFile =
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile =
if (checkExists && Effect.notExists(path))
throw swaydb.Exception.NoSuchFile(path)
else {
Expand Down Expand Up @@ -186,7 +189,8 @@ object DBFile extends LazyLogging {
blockCacheFileId: Long,
bytes: Iterable[Slice[Byte]])(implicit fileSweeper: FileSweeperActor,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor): DBFile = {
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile = {
val totalWritten =
bytes.foldLeft(0) { //do not write bytes if the Slice has empty bytes.
case (written, bytes) =>
Expand Down Expand Up @@ -219,7 +223,8 @@ object DBFile extends LazyLogging {
blockCacheFileId: Long,
bytes: Slice[Byte])(implicit fileSweeper: FileSweeperActor,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor): DBFile =
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile =
//do not write bytes if the Slice has empty bytes.
if (!bytes.isFull) {
throw swaydb.Exception.FailedToWriteAllBytes(0, bytes.size, bytes.size)
Expand All @@ -246,7 +251,8 @@ object DBFile extends LazyLogging {
blockCacheFileId: Long,
checkExists: Boolean = true)(implicit fileSweeper: FileSweeperActor,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor): DBFile =
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile =
if (checkExists && Effect.notExists(path)) {
throw swaydb.Exception.NoSuchFile(path)
} else {
Expand Down Expand Up @@ -278,7 +284,8 @@ object DBFile extends LazyLogging {
deleteAfterClean: Boolean,
forceSave: ForceSave.MMAPFiles)(implicit fileSweeper: FileSweeperActor,
blockCache: Option[BlockCache.State],
bufferCleaner: ByteBufferSweeperActor): DBFile = {
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): DBFile = {
val file =
MMAPFile.write(
path = path,
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/swaydb/core/io/file/ForceSaveApplier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,18 @@ trait ForceSaveApplier {

object ForceSaveApplier extends LazyLogging {

implicit object DefaultApplier extends ForceSaveApplier {
object Disabled extends ForceSaveApplier {
override def beforeClean(path: Path, buffer: MappedByteBuffer, forced: AtomicBoolean, forceSave: ForceSave.MMAPFiles): Unit =
logger.error(s"Disabled ForceSaveApplier beforeClean - $path", new Exception("Disabled ForceSaveApplier"))

override def beforeCopy(file: DBFile, toPath: Path, forceSave: ForceSave): Unit =
logger.error(s"Disabled ForceSaveApplier beforeCopy - ${file.path} - toPath - $toPath", new Exception("Disabled ForceSaveApplier"))

override def beforeClose[F <: DBFileType](file: F, forceSave: ForceSave): Unit =
logger.error(s"Disabled ForceSaveApplier beforeClose - ${file.path}", new Exception("Disabled ForceSaveApplier"))
}

object DefaultApplier extends ForceSaveApplier {

/**
* Applies forceSave condition for before cleaning the file.
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/swaydb/core/io/file/MMAPFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import java.nio.{BufferOverflowException, MappedByteBuffer}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import com.typesafe.scalalogging.LazyLogging
import swaydb.IO
import swaydb.core.actor.ByteBufferSweeper
import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.data.Reserve
Expand All @@ -46,7 +45,8 @@ private[file] object MMAPFile {
bufferSize: Long,
blockCacheFileId: Long,
deleteAfterClean: Boolean,
forceSave: ForceSave.MMAPFiles)(implicit cleaner: ByteBufferSweeperActor): MMAPFile =
forceSave: ForceSave.MMAPFiles)(implicit cleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): MMAPFile =
MMAPFile(
path = path,
channel = FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW),
Expand All @@ -59,7 +59,8 @@ private[file] object MMAPFile {

def read(path: Path,
blockCacheFileId: Long,
deleteAfterClean: Boolean)(implicit cleaner: ByteBufferSweeperActor): MMAPFile = {
deleteAfterClean: Boolean)(implicit cleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): MMAPFile = {
val channel = FileChannel.open(path, StandardOpenOption.READ)

MMAPFile(
Expand All @@ -79,7 +80,8 @@ private[file] object MMAPFile {
bufferSize: Long,
blockCacheFileId: Long,
deleteAfterClean: Boolean,
forceSave: ForceSave.MMAPFiles)(implicit cleaner: ByteBufferSweeperActor): MMAPFile = {
forceSave: ForceSave.MMAPFiles)(implicit cleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): MMAPFile = {
val buff = channel.map(mode, 0, bufferSize)
new MMAPFile(
path = path,
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/swaydb/core/level/Level.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import swaydb.core.actor.MemorySweeper
import swaydb.core.data.{KeyValue, _}
import swaydb.core.function.FunctionStore
import swaydb.core.io.file.Effect._
import swaydb.core.io.file.{BlockCache, Effect, FileLocker}
import swaydb.core.io.file.{BlockCache, Effect, FileLocker, ForceSaveApplier}
import swaydb.core.level.seek._
import swaydb.core.map.serializer._
import swaydb.core.map.{Map, MapEntry}
Expand Down Expand Up @@ -109,7 +109,8 @@ private[core] object Level extends LazyLogging {
keyValueMemorySweeper: Option[MemorySweeper.KeyValue],
blockCache: Option[BlockCache.State],
fileSweeper: FileSweeperActor,
bufferCleaner: ByteBufferSweeperActor): IO[swaydb.Error.Level, Level] = {
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier): IO[swaydb.Error.Level, Level] = {
//acquire lock on folder
acquireLock(levelStorage) flatMap {
lock =>
Expand Down Expand Up @@ -344,7 +345,8 @@ private[core] case class Level(dirs: Seq[Dir],
val blockCache: Option[BlockCache.State],
val segmentIDGenerator: IDGenerator,
val segmentIO: SegmentIO,
reserve: ReserveRange.State[Unit]) extends NextLevel with LazyLogging { self =>
reserve: ReserveRange.State[Unit],
val forceSaveApplier: ForceSaveApplier) extends NextLevel with LazyLogging { self =>

logger.info(s"{}: Level started.", pathDistributor)

Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/swaydb/core/level/tool/AppendixRepairer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import swaydb.core.actor.ByteBufferSweeper.ByteBufferSweeperActor
import swaydb.core.actor.FileSweeper.FileSweeperActor
import swaydb.core.actor.MemorySweeper
import swaydb.core.function.FunctionStore
import swaydb.core.io.file.Effect
import swaydb.core.io.file.{Effect, ForceSaveApplier}
import swaydb.core.level.AppendixSkipListMerger
import swaydb.core.map.serializer.MapEntryWriter
import swaydb.core.map.{Map, MapEntry, SkipListMerger}
Expand All @@ -60,6 +60,7 @@ private[swaydb] object AppendixRepairer extends LazyLogging {
implicit val memorySweeper = Option.empty[MemorySweeper.KeyValue]
//mmap is false. FIXME - use ByteBufferCleaner.Disabled instead
implicit val bufferCleaner: ByteBufferSweeperActor = null
implicit val forceSaveApplier = ForceSaveApplier.DefaultApplier

IO(Effect.files(levelPath, Extension.Seg)) flatMap {
files =>
Expand All @@ -77,7 +78,8 @@ private[swaydb] object AppendixRepairer extends LazyLogging {
blockCache = None,
keyValueMemorySweeper = memorySweeper,
fileSweeper = fileSweeper,
bufferCleaner = bufferCleaner
bufferCleaner = bufferCleaner,
forceSaveApplier = forceSaveApplier
)
}
}
Expand Down Expand Up @@ -173,16 +175,28 @@ private[swaydb] object AppendixRepairer extends LazyLogging {
timeOrder: TimeOrder[Slice[Byte]],
fileSweeper: FileSweeperActor,
bufferCleaner: ByteBufferSweeperActor,
forceSaveApplier: ForceSaveApplier,
functionStore: FunctionStore,
writer: MapEntryWriter[MapEntry.Put[Slice[Byte], Segment]],
skipListMerger: SkipListMerger[SliceOption[Byte], SegmentOption, Slice[Byte], Segment]): IO[swaydb.Error.Level, Unit] =
IO {
Effect.walkDelete(appendixDir)

val mmap =
MMAP.Disabled(
forceSave =
ForceSave.BeforeClose(
enableBeforeCopy = true,
enableForReadOnly = true,
logBenchmark = false
)
)

Map.persistent[SliceOption[Byte], SegmentOption, Slice[Byte], Segment](
nullKey = Slice.Null,
nullValue = Segment.Null,
folder = appendixDir,
mmap = MMAP.Disabled(ForceSave.Disabled),
mmap = mmap,
flushOnOverflow = true,
fileSize = 1.gb
)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/swaydb/core/level/zero/LevelZero.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import swaydb.core.data.KeyValue.{Put, PutOption}
import swaydb.core.data.Value.FromValue
import swaydb.core.data._
import swaydb.core.function.FunctionStore
import swaydb.core.io.file.{Effect, FileLocker}
import swaydb.core.io.file.{Effect, FileLocker, ForceSaveApplier}
import swaydb.core.level.seek._
import swaydb.core.level.{LevelRef, LevelSeek, NextLevel}
import swaydb.core.map
Expand Down Expand Up @@ -71,7 +71,8 @@ private[core] object LevelZero extends LazyLogging {
throttle: LevelZeroMeter => FiniteDuration)(implicit keyOrder: KeyOrder[Slice[Byte]],
timeOrder: TimeOrder[Slice[Byte]],
bufferCleaner: ByteBufferSweeperActor,
functionStore: FunctionStore): IO[swaydb.Error.Level, LevelZero] = {
functionStore: FunctionStore,
forceSaveApplier: ForceSaveApplier): IO[swaydb.Error.Level, LevelZero] = {
import swaydb.core.map.serializer.LevelZeroMapEntryReader.Level0Reader
import swaydb.core.map.serializer.LevelZeroMapEntryWriter._

Expand All @@ -97,6 +98,7 @@ private[core] object LevelZero extends LazyLogging {
timeOrder = timeOrder,
functionStore = functionStore,
bufferCleaner = bufferCleaner,
forceSaveApplier = forceSaveApplier,
writer = TimerMapEntryWriter.TimerPutMapEntryWriter,
reader = TimerMapEntryReader.TimerPutMapEntryReader)
} else {
Expand Down Expand Up @@ -156,6 +158,7 @@ private[core] object LevelZero extends LazyLogging {
timeOrder = timeOrder,
functionStore = functionStore,
bufferCleaner = bufferCleaner,
forceSaveApplier = forceSaveApplier,
writer = TimerMapEntryWriter.TimerPutMapEntryWriter,
reader = TimerMapEntryReader.TimerPutMapEntryReader)

Expand Down
Loading

0 comments on commit 5fa4ac1

Please sign in to comment.