Skip to content

Commit

Permalink
Implement optional parallel log bootstrap for segmented logs (#158)
Browse files Browse the repository at this point in the history
* Explore parallelization

* Change package

* Refactor and add configuration options

* Fix Scala 2.11/2.12 parallel helpers

* Remove hopefully unnecessary dependency and reset Scala version to 2.13

* Quick sanity check unit test

* Fix parallelization
  • Loading branch information
HaloFour authored Jan 16, 2025
1 parent 2ba8dac commit f270f66
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 14 deletions.
9 changes: 9 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ libraryDependencies ++= {
)
}

libraryDependencies ++= {
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, major)) if major <= 12 =>
Seq()
case _ =>
Seq("org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.4")
}
}

// Set the artifact names.
artifactName := { (scalaVersion: ScalaVersion, module: ModuleID, artifact: Artifact) =>
artifact.`type` match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.comcast.xfinity.sirius.uberstore.segmented

import scala.collection.parallel.ParSeq

object ParallelHelpers {
def parallelize[T](seq: Seq[T]): ParSeq[T] = seq.par
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.comcast.xfinity.sirius.uberstore.segmented

import scala.collection.parallel.ParSeq

object ParallelHelpers {
def parallelize[T](seq: Seq[T]): ParSeq[T] = seq.par
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.comcast.xfinity.sirius.uberstore.segmented

import scala.collection.parallel.CollectionConverters._
import scala.collection.parallel.ParSeq

object ParallelHelpers {
def parallelize[T](seq: Seq[T]): ParSeq[T] = seq.par
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ object SiriusConfiguration {
* Maximum akka message size in KB. Default is 1024. Type is Integer.
*/
final val MAX_AKKA_MESSAGE_SIZE_KB = "sirius.akka.maximum-frame-size-kb"

/**
* Whether or not to bootstrap the log in parallel, only applies to segmented uberstores
*/
final val LOG_PARALLEL_ENABLED = "sirius.log.parallel-enabled"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,34 @@ class StateSup(requestHandler: RequestHandler,
bootstrapTime = Some(0L)

case _ =>
val parallel = config.getProp(SiriusConfiguration.LOG_PARALLEL_ENABLED, default = false)
val start = System.currentTimeMillis
logger.info("Beginning SiriusLog replay at {}", start)
requestHandler.onBootstrapStarting()
siriusLog.foreach(
orderedEvent =>
try {
orderedEvent.request match {
case Put(key, body) => requestHandler.handlePut(orderedEvent.sequence, key, body)
case Delete(key) => requestHandler.handleDelete(orderedEvent.sequence, key)
}
} catch {
case rte: RuntimeException =>
eventReplayFailureCount += 1
logger.error("Exception replaying {}: {}", orderedEvent, rte)
}
)
if (parallel) {
siriusLog.parallelForeach(bootstrapEvent)
} else {
siriusLog.foreach(bootstrapEvent)
}
requestHandler.onBootstrapComplete()
val totalBootstrapTime = System.currentTimeMillis - start
bootstrapTime = Some(totalBootstrapTime)
logger.info("Replayed SiriusLog in {}ms", totalBootstrapTime)
}
}

private def bootstrapEvent(orderedEvent : OrderedEvent): Unit =
try {
orderedEvent.request match {
case Put(key, body) => requestHandler.handlePut(orderedEvent.sequence, key, body)
case Delete(key) => requestHandler.handleDelete(orderedEvent.sequence, key)
}
} catch {
case rte: RuntimeException =>
eventReplayFailureCount += 1
logger.error("Exception replaying {}: {}", orderedEvent, rte)
}

trait StateInfoMBean {
def getEventReplayFailureCount: Long
def getBootstrapTime: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.comcast.xfinity.sirius.uberstore.segmented

import java.io.{File => JFile}

import better.files.File
import com.comcast.xfinity.sirius.api.SiriusConfiguration
import com.comcast.xfinity.sirius.api.impl.OrderedEvent
Expand Down Expand Up @@ -136,6 +135,11 @@ class SegmentedUberStore private[segmented] (base: JFile,
*/
def getNextSeq = nextSeq

override def parallelForeach[T](fun: OrderedEvent => T): Unit = {
ParallelHelpers.parallelize(readOnlyDirs :+ liveDir)
.foreach(_.foldLeftRange(0, Long.MaxValue)(())((_, e) => fun(e)))
}

/**
* @inheritdoc
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ trait SiriusLog {
*/
def foreach[T](fun: OrderedEvent => T): Unit = foldLeft(())((_, e) => fun(e))

/**
* Apply fun to each entry in the log in parallel and potentially out of order
*
* @param fun function to apply
*/
def parallelForeach[T](fun: OrderedEvent => T): Unit = foreach[T](fun)

/**
* Fold left across the log entries
* @param acc0 initial accumulator value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.comcast.xfinity.sirius.uberstore.segmented

import scala.collection.concurrent._

import com.comcast.xfinity.sirius.NiceTest
import java.io.{File => JFile}

Expand Down Expand Up @@ -195,6 +197,21 @@ class SegmentedUberStoreTest extends NiceTest {
}
}

describe("parallelForeach") {
it("should bootstrap the uberstore in parallel") {
createPopulatedSegment(dir, "1", Range.inclusive(1, 3).toList, isApplied = true)
createPopulatedSegment(dir, "2", Range.inclusive(4, 6).toList, isApplied = true)
createPopulatedSegment(dir, "3", Range.inclusive(7, 9).toList, isApplied = true)
val config = new SiriusConfiguration
config.setProp(SiriusConfiguration.LOG_PARALLEL_ENABLED, true)
uberstore = SegmentedUberStore(dir.getAbsolutePath, config)
val map = new TrieMap[Long, SiriusRequest]()
uberstore.parallelForeach(event => map.put(event.sequence, event.request))

assert(map.size == 9)
}
}

describe("close") {
it("should close all of the associated uberdirs") {
uberstore.close()
Expand Down

0 comments on commit f270f66

Please sign in to comment.