diff --git a/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala b/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala index 3f13957..0d32b75 100644 --- a/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala +++ b/src/main/scala/com/hootsuite/circuitbreaker/CircuitBreaker.scala @@ -17,6 +17,8 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} * @param name the name of the circuit breaker * @param failLimit maximum number of consecutive failures before the circuit breaker is tripped (opened) * @param retryDelay duration until an open/broken circuit breaker lets a call through to verify whether or not it should be reset + * @param isExponentialBackoff indicating the retry delayed should be increased exponential on consecutive failures + * @param exponentialRetryCap limits the number of times the retryDelay will be increased exponentially, ignored if not exponential backoff * @param isResultFailure partial function to allow users to determine return cases which should be considered as failures * @param isExceptionNotFailure partial function to allow users to determine exceptions which should not be considered failures * @param stateChangeListeners listeners that will be notified when the circuit breaker changes state (open <--> closed) @@ -26,6 +28,8 @@ class CircuitBreaker private[circuitbreaker] ( val name: String, val failLimit: Int, val retryDelay: FiniteDuration, + val isExponentialBackoff: Boolean = false, + val exponentialRetryCap: Option[Int], val isResultFailure: PartialFunction[Any, Boolean] = { case _ => false }, val isExceptionNotFailure: PartialFunction[Throwable, Boolean] = { case _ => false }, val stateChangeListeners: List[CircuitBreakerStateChangeListener] = List(), @@ -38,6 +42,8 @@ class CircuitBreaker private[circuitbreaker] ( builder.name, builder.failLimit, builder.retryDelay, + builder.isExponentialBackoff, + builder.exponentialRetryCap, builder.isResultFailure, builder.isExceptionNotFailure, builder.stateChangeListeners, @@ -180,9 +186,9 @@ class CircuitBreaker private[circuitbreaker] ( * @param currentState the expected current state * @return true when the state was changed, false when the given state was not the current state */ - def attemptResetBrokenState(currentState: BrokenState): Boolean = { + def attemptResetBrokenState(currentState: BrokenState, retryCount: Int): Boolean = { logger.debug(s"Circuit breaker \'$name\', attempting to reset open/broken state") - state.compareAndSet(currentState, new BrokenState(this)) + state.compareAndSet(currentState, new BrokenState(this, retryCount)) } /** @@ -286,7 +292,7 @@ private object CircuitBreaker { override def onFailure(): Unit = incrementFailure() - private[this] def incrementFailure() = { + private[this] def incrementFailure(): Unit = { val currentCount = failureCount.incrementAndGet logger.debug( s"Circuit breaker ${cb.name} increment failure count to $currentCount; fail limit is ${cb.failLimit}" @@ -298,8 +304,35 @@ private object CircuitBreaker { /** * CircuitBreaker is opened/broken. Invocations fail immediately. */ - class BrokenState(cb: CircuitBreaker) extends State { - val retryAt: Long = System.currentTimeMillis() + cb.retryDelay.toMillis + class BrokenState(cb: CircuitBreaker, retryCount: Int = 0) extends State { + val retryAt: Long = System.currentTimeMillis() + calcRetryDelay(cb) + + def calcRetryDelay(cb: CircuitBreaker): Long = { + + //calc jitter up to 1/10 the current retryDelay + //for exponential backoff + val jitter: Long = if (cb.isExponentialBackoff) { + (scala.util.Random.nextFloat() * cb.retryDelay.toMillis / 10).toLong + } else { + 0 + } + + val retryCap = cb.exponentialRetryCap match { + case Some(x) => x + case None => Int.MaxValue + } + + val exponent: Int = if (cb.isExponentialBackoff) { + Math.min(retryCount, retryCap) + } else 0 + + val result = (cb.retryDelay.toMillis + jitter) * Math.pow(2, exponent).toLong + + logger.debug(s"CB details retry delay details: jitter $jitter, " + + s"retryCap $retryCap, exponent $exponent delay $result") + + result + } override def preInvoke(): Unit = { cb.invocationListeners.foreach { listener => @@ -311,7 +344,7 @@ private object CircuitBreaker { } val retry = System.currentTimeMillis > retryAt - if (!(retry && cb.attemptResetBrokenState(this))) { + if (!(retry && cb.attemptResetBrokenState(this, this.retryCount + 1))) { throw new CircuitBreakerBrokenException( cb.name, s"Making ${cb.name} unavailable after ${cb.failLimit} errors" @@ -336,6 +369,8 @@ private object CircuitBreaker { * @param name the name of the circuit breaker * @param failLimit maximum number of consecutive failures before the circuit breaker is tripped (opened) * @param retryDelay duration until an open/broken circuit breaker lets a call through to verify whether or not it should be reset + * @param isExponentialBackoff indicating the retry delayed should be increased exponential on consecutive failures + * @param exponentialRetryCap limits the number of times the retryDelay will be increased exponentially, ignored if not exponential backoff * @param isResultFailure partial function to allow users to determine return cases which should be considered as failures * @param isExceptionNotFailure partial function to allow users to determine exceptions which should not be considered failures * @param stateChangeListeners listeners that will be notified when the circuit breaker changes state (open <--> closed) @@ -345,6 +380,8 @@ case class CircuitBreakerBuilder( name: String, failLimit: Int, retryDelay: FiniteDuration, + isExponentialBackoff: Boolean = false, + exponentialRetryCap: Option[Int] = Some(10), isResultFailure: PartialFunction[Any, Boolean] = { case _ => false }, isExceptionNotFailure: PartialFunction[Throwable, Boolean] = { case _ => false }, stateChangeListeners: List[CircuitBreakerStateChangeListener] = List(), diff --git a/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala b/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala index f5cba6a..d7dd9f9 100644 --- a/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala +++ b/src/test/scala/com/hootsuite/circuitbreaker/CircuitBreakerTest.scala @@ -3,7 +3,7 @@ package com.hootsuite.circuitbreaker import com.hootsuite.circuitbreaker.listeners.{CircuitBreakerInvocationListener, CircuitBreakerStateChangeListener} import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Seconds, Span} -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{Assertion, FlatSpec, Matchers} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.{Duration, FiniteDuration} @@ -13,197 +13,339 @@ import scala.util.{Failure, Success} import java.util.concurrent.TimeUnit class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { + private val numMillisecondsForRetryDelay = 200L - object SimpleOperation { + // for whenReady calls + private implicit val defaultPatience: PatienceConfig = + PatienceConfig(timeout = Span(2, Seconds), interval = Span(10, Millis)) + private val defaultRetryDelay = Duration(numMillisecondsForRetryDelay, TimeUnit.MILLISECONDS) - def operation(x: Int, y: Int): Int = x / y + def assertArithException(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[ArithmeticException] + } + } - def asyncOperation(x: Int, y: Int): Future[Int] = Future { x / y } + def assertArithExceptionAsync(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(protectedAsyncOperation(1, 0).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[ArithmeticException] + } + } + + def protectedAsyncOperation(x: Int, y: Int)(implicit cb: CircuitBreaker): Future[Int] = cb.async() { + SimpleOperation.asyncOperation(x, y) } - // for whenReady calls - private implicit val defaultPatience = - PatienceConfig(timeout = Span(2, Seconds), interval = Span(10, Millis)) + def assertCircuitException(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[CircuitBreakerBrokenException] + } + } - private val numMillisecondsForRetryDelay = 200L - private val defaultRetryDelay = Duration(numMillisecondsForRetryDelay, TimeUnit.MILLISECONDS) + def assertCircuitExceptionAsync(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(protectedAsyncOperation(1, 0).failed, timeout(Span(timeoutLength, Millis))) { e => + withClue(s"${cb.name} : $hint") { + e shouldBe a[CircuitBreakerBrokenException] + } + } - private def waitUntilRetryDelayHasExpired() = Thread.sleep(2 * numMillisecondsForRetryDelay) + def assertClosed(hint: String, timeoutLength: Int = 100)(implicit cb: CircuitBreaker): Assertion = + whenReady(Future(protectedOperation(2, 1), timeout(Span(timeoutLength, Millis)))) { + case (result, _) => + withClue(s"${cb.name} : $hint") { + result shouldEqual 2 + } + } - private def simpleBuilder(name: String, failLimit: Int, retryDelay: FiniteDuration) = - CircuitBreakerBuilder(name = name, failLimit = failLimit, retryDelay = retryDelay) + def protectedOperation(x: Int, y: Int)(implicit cb: CircuitBreaker): Int = cb() { + SimpleOperation.operation(x, y) + } + + private def waitUntilRetryDelayHasExpired(millis: Option[Long] = None): Unit = + millis match { + case Some(x) => Thread.sleep(x) + case None => Thread.sleep(2 * numMillisecondsForRetryDelay) + } // CB builders that misbehave on (state change|invocation) listeners - either throwing or blocking // + one plain CB for baseline - private def simpleBuilders( - failLimit: Int = 2, - retryDelay: FiniteDuration = defaultRetryDelay - ): List[CircuitBreakerBuilder] = List( - simpleBuilder("simple", failLimit, retryDelay), - simpleBuilder("invocation listeners both block", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + private def simpleBuilders(failLimit: Int = 2, + retryDelay: FiniteDuration = defaultRetryDelay + ): List[CircuitBreakerBuilder] = { + val defaultThreadSleep = 5000 - override def onInvocationInFlowState(name: String) = Thread.sleep(5000) + List( + simpleBuilder("simple", failLimit, retryDelay), + simpleBuilder("invocation listeners both block", failLimit, retryDelay) + .withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInBrokenState(name: String) = Thread.sleep(5000) - })), - simpleBuilder("invocation flow throws, invocation broken blocks", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + override def onInvocationInFlowState(name: String): Unit = Thread.sleep(defaultThreadSleep) + + override def onInvocationInBrokenState(name: String): Unit = Thread.sleep(defaultThreadSleep) + })), + simpleBuilder( + "invocation flow throws, invocation broken blocks", + failLimit, + retryDelay + ).withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInFlowState(name: String) = throw new Exception("boom") + override def onInvocationInFlowState(name: String): Unit = throw new Exception("boom") - override def onInvocationInBrokenState(name: String) = Thread.sleep(5000) + override def onInvocationInBrokenState(name: String): Unit = Thread.sleep(defaultThreadSleep) })), - simpleBuilder("invocation flow blocks, invocation broken blocks", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + simpleBuilder( + "invocation flow blocks, invocation broken blocks", + failLimit, + retryDelay + ).withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInFlowState(name: String) = Thread.sleep(5000) + override def onInvocationInFlowState(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onInvocationInBrokenState(name: String) = throw new Exception("boom") + override def onInvocationInBrokenState(name: String): Unit = throw new Exception("boom") })), - simpleBuilder("invocation listeners both throw", failLimit, retryDelay) - .withInvocationListeners(List(new CircuitBreakerInvocationListener { + simpleBuilder("invocation listeners both throw", failLimit, retryDelay) + .withInvocationListeners(List(new CircuitBreakerInvocationListener { - override def onInvocationInFlowState(name: String) = throw new Exception("boom") + override def onInvocationInFlowState(name: String): Unit = throw new Exception("boom") - override def onInvocationInBrokenState(name: String) = throw new Exception("boom") - })), - simpleBuilder("state change listeners all throw", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onInvocationInBrokenState(name: String): Unit = throw new Exception("boom") + })), + simpleBuilder("state change listeners all throw", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onInit(name: String) = throw new Exception("boom") + override def onInit(name: String): Unit = throw new Exception("boom") - override def onTrip(name: String) = throw new Exception("boom") + override def onTrip(name: String): Unit = throw new Exception("boom") - override def onReset(name: String) = throw new Exception("boom") - })), - simpleBuilder("state change listeners all block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onReset(name: String): Unit = throw new Exception("boom") + })), + simpleBuilder("state change listeners all block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onInit(name: String) = Thread.sleep(5000) + override def onInit(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onTrip(name: String) = Thread.sleep(5000) + override def onTrip(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onReset(name: String) = Thread.sleep(5000) - })), - simpleBuilder("state change onInit throws, onTrip, onReset block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onReset(name: String): Unit = Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("state change onInit throws, onTrip, onReset block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onInit(name: String) = throw new Exception("boom") + override def onInit(name: String): Unit = throw new Exception("boom") - override def onTrip(name: String) = Thread.sleep(5000) + override def onTrip(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onReset(name: String) = Thread.sleep(5000) - })), - simpleBuilder("state change onTrip throws, onInit, onReset block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onReset(name: String): Unit = Thread.sleep(defaultThreadSleep) + })), + simpleBuilder( + "state change onTrip throws, onInit, onReset block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onInit(name: String) = Thread.sleep(5000) + override def onInit(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onTrip(name: String) = throw new Exception("boom") + override def onTrip(name: String): Unit = throw new Exception("boom") - override def onReset(name: String) = Thread.sleep(5000) - })), - simpleBuilder("state change onReset throws, onInit, onReset block", failLimit, retryDelay) - .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { + override def onReset(name: String): Unit = Thread.sleep(defaultThreadSleep) + })), + simpleBuilder("state change onReset throws, onInit, onReset block", failLimit, retryDelay) + .withStateChangeListeners(List(new CircuitBreakerStateChangeListener { - override def onInit(name: String) = Thread.sleep(5000) + override def onInit(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onTrip(name: String) = Thread.sleep(5000) + override def onTrip(name: String): Unit = Thread.sleep(defaultThreadSleep) - override def onReset(name: String) = throw new Exception("boom") - })) - ) + override def onReset(name: String): Unit = throw new Exception("boom") + })) + ) + } - "simple circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in { + private def simpleBuilder(name: String, failLimit: Int, retryDelay: FiniteDuration) = - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + CircuitBreakerBuilder( + name = name, + failLimit = failLimit, + retryDelay = retryDelay + ) + + private def exponentialBuilders(failLimit: Int = 2, + retryDelay: FiniteDuration = defaultRetryDelay, + retryCap: Int = 10 + ): List[CircuitBreakerBuilder] = { + List( + exponentialBuilder("simple", failLimit, retryDelay, retryCap) + ) + } + + private def exponentialBuilder(name: String, failLimit: Int, retryDelay: FiniteDuration, + exponentialRetryCap: Int) = + + CircuitBreakerBuilder(name = name, + failLimit = failLimit, + retryDelay = retryDelay, + isExponentialBackoff = true, + exponentialRetryCap = Some(exponentialRetryCap) + ) + + object SimpleOperation { + + def operation(x: Int, y: Int): Int = x / y + + def asyncOperation(x: Int, y: Int): Future[Int] = Future { + x / y + } + } + + "simple circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in { + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x // run all protectedOperation invocations async, so we can catch & timeout rogue, blocking listeners // 100ms should be more than enough for our SimpleOperation // first failure; let it through - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : first failure") { - e shouldBe a[ArithmeticException] - } - } + assertArithException("first failure") // second failure; let it through again but this should trip the circuit breaker - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : second failure") { - e shouldBe a[ArithmeticException] - } - } + assertArithException("second failure") // now we get a circuit breaker exception because the circuit breaker is open - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB broken failure") //wait a bit waitUntilRetryDelayHasExpired() //circuit should now be closed and a valid operation should just work - whenReady(Future(protectedOperation(2, 1), timeout(Span(100, Millis)))) { - case (result, timeout) => - withClue(s"${cb.name} : after retry delay") { - result shouldEqual 2 - } - } + assertClosed("after retry delay") } } - it should "remain in tripped state on repeated errors" in { - simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) + it should "wait exponentially longer before retrying if is an exponential backoff CB" in { + + val baseRetryTime = 30 + exponentialBuilders(retryDelay = Duration(baseRetryTime, TimeUnit.MILLISECONDS)) + .map(_.build()) + .foreach { x => + implicit val cb: CircuitBreaker = x + + //should fail on first exception + assertArithException("1st should be Arithmetic") + + //should fail on second exception because the circuit has not been tripped + assertArithException("2nd should be Arithmetic") + + //next attempt should be circuit because we have reached the fail limit of 2 + assertCircuitException("3rd should be circuit") + + //wait the normal amount of time + waitUntilRetryDelayHasExpired(Some(baseRetryTime)) + + //should fail on first exception + assertArithException("4th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("5th should be circuit") + + //wait a bit (this wont be enough, need to wait twice as long + waitUntilRetryDelayHasExpired(Some(baseRetryTime)) + + //next attempt should fail, we didn't wait long enough + assertCircuitException("6th should be circuit") + + //wait a lot longer + waitUntilRetryDelayHasExpired(Some(baseRetryTime * 2)) + + //circuit should now be closed and a valid operation should just work + assertClosed("after exponential retry delay") } + } + + it should "wait exponentially longer before retrying if is an exponential backoff CB but stop " + + "increasing at limit" in { + + val baseRetryTime = 100 + val baseWaitTime = baseRetryTime * 1.1 + val retryCap = 2 + exponentialBuilders(retryDelay = Duration(baseRetryTime, + TimeUnit.MILLISECONDS), retryCap = retryCap) + .map(_.build()) + .foreach { x => + implicit val cb: CircuitBreaker = x + + //should fail on first exception + assertArithException("1st should be Arithmetic") + + //should fail on second exception because the circuit has not been tripped + assertArithException("2nd should be Arithmetic") + + //next attempt should be circuit because we have reached the fail limit of 2 + assertCircuitException("3rd should be circuit") + + //wait the normal amount of time + waitUntilRetryDelayHasExpired(Some(baseRetryTime)) + + //should fail on first exception + assertArithException("4th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("5th should be circuit") + + //wait a bit (this wont be enough, need to wait 2^1x as long + waitUntilRetryDelayHasExpired(Some((baseWaitTime * 2).toInt)) + + //should fail on first exception + assertArithException("6th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("7th should be circuit") + + //wait a bit longer this time 2^2x + waitUntilRetryDelayHasExpired(Some((baseWaitTime * 4).toInt)) + + //should fail on first exception + assertArithException("8th should be Arithmetic") + + //next attempt should be circuit we are in a broken state, not a flow state + assertCircuitException("9th should be circuit") + + //wait a bit longer this time 2^2x + waitUntilRetryDelayHasExpired(Some((baseWaitTime * 4).toInt)) + + //if the cap was not used, we we have to wait 2^3x as long, so only waiting + //2^2x should be long enough + + //should fail on first exception + assertArithException("8th should be Arithmetic") - (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } } + } + + it should "remain in tripped state on repeated errors" in { + simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + + (1 to 2).foreach { i => assertArithException(s"failure #$i") } // the CB is now tripped // repeated calls will just fail immediately, regardless of call (1 to 100).foreach { i => - // this call is legal but CB is in tripped state - whenReady(Future(protectedOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : attempt #$i while CB tripped") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException(s"attempt #$i while CB tripped") } } } it should "be 'waiting' if it is in a tripped state, but the reset time delay has been reached" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x cb.isFlowing shouldBe true cb.isBroken shouldBe false cb.isWaiting shouldBe false - (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 2).foreach { i => assertArithException(s"failure #$i") } cb.isFlowing shouldBe false cb.isBroken shouldBe true @@ -219,86 +361,48 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "let call through to underlying function when reset time delay reached, even with error" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x - (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 2).foreach { i => assertArithException(s"failure #$i") } // now we get a circuit breaker exception because the circuit breaker is open - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB broken failure") //wait until after the reset delay time has elapsed waitUntilRetryDelayHasExpired() // CB should let this call go through to test whether or not we need reset - will fail - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : test reset attempt after retry delay") { - e shouldBe a[ArithmeticException] - } - } + assertArithException("test reset attempt after retry delay") // back to tripped state but with retry delay now reset, fail immediately with CB exception - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB remain broken after failed reset attempt") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB remain broken after failed reset attempt", 10) } } "simple async circuit breaker" should "record failures, trip, then reset after delay time has elapsed" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x - (1 to 2).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 2).foreach { i => assertArithException(s"failure #$i") } // now we get a circuit breaker exception because the circuit breaker is open - whenReady(protectedOperation(1, 0).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("CB broken failure", 10) //wait a bit waitUntilRetryDelayHasExpired() //circuit should now be closed and a valid operation should just work - whenReady(protectedOperation(2, 1), timeout(Span(100, Millis))) { result => - withClue(s"${cb.name} : after retry delay") { - result shouldEqual 2 - } - } + assertClosed("after retry delay") } } it should "remain in tripped state on repeated errors" in { - simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders(retryDelay = Duration(2, TimeUnit.SECONDS)).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x - def syncProtectedOperation(x: Int, y: Int): Int = - Await.result(protectedOperation(x, y), Duration(1, TimeUnit.SECONDS)) + def syncProtectedOperation(x: Int, y: Int)(implicit cb: CircuitBreaker): Int = + Await.result(protectedAsyncOperation(x, y), Duration(1, TimeUnit.SECONDS)) (1 to 2).foreach { i => whenReady(Future(syncProtectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => @@ -323,22 +427,14 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "be 'waiting' if it is in a tripped state, but the reset time delay has been reached" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x cb.isFlowing shouldBe true cb.isBroken shouldBe false cb.isWaiting shouldBe false - (1 to 2).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 2).foreach { i => assertArithExceptionAsync(s"failure #$i") } cb.isFlowing shouldBe false cb.isBroken shouldBe true @@ -355,65 +451,41 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { it should "let call through to underlying function when reset time delay reached, even with error" in { - simpleBuilders().map(_.build()).foreach { cb => - def protectedOperation(x: Int, y: Int): Future[Int] = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + simpleBuilders().map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x - (1 to 2).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 2).foreach { i => assertArithExceptionAsync(s"failure #$i") } // now we get a circuit breaker exception because the circuit breaker is open - whenReady(protectedOperation(1, 0).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB broken failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("CB Broken Failure") //wait until after the reset delay time has elapsed waitUntilRetryDelayHasExpired() // CB should let this call go through to test whether or not we need reset - will fail - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : test reset attempt after retry delay") { - e shouldBe a[ArithmeticException] - } - } + assertArithExceptionAsync("test reset attempt after retry delay") // back to tripped state but with retry delay now reset, fail immediately with CB exception - whenReady(protectedOperation(1, 0).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : CB remain broken after failed reset attempt") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("CB remain broken after failed reset attempt", 10) } } "circuit breaker with fallback value" should "return the fallback value when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + val theDefaultValue = 0 // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb(fallback = Some(Success(theDefaultValue))) { + def fallbackOperation(x: Int, y: Int) = cb(fallback = Some(Success(theDefaultValue))) { SimpleOperation.operation(x, y) } - (1 to 5).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 5).foreach { i => assertArithException(s"failure #$i") } // return the default value here, while the circuit is tripped - whenReady(Future(protectedOperation(222, 1), timeout(Span(100, Millis)))) { - case (result, timeout) => + whenReady(Future(fallbackOperation(222, 1), timeout(Span(100, Millis)))) { + case (result, _) => withClue(s"${cb.name} : after retry delay") { result shouldEqual theDefaultValue } @@ -422,24 +494,20 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "throw a custom exception when configured as such and when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + class CustomException extends Throwable val customFailure = Failure(new CustomException) // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb(fallback = Some(customFailure)) { + def customExceptionOperation(x: Int, y: Int) = cb(fallback = Some(customFailure)) { SimpleOperation.operation(x, y) } - (1 to 5).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 5).foreach { i => assertArithException(s"failure #$i") } - whenReady(Future(protectedOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => + whenReady(Future(customExceptionOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => withClue(s"${cb.name} : custom fallback failure") { e shouldBe a[CustomException] } @@ -448,24 +516,20 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } "async circuit breaker with fallback value" should "return the fallback value when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x + val theDefaultValue = 0 // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb.async(fallback = Some(Success(theDefaultValue))) { + def fallbackOperation(x: Int, y: Int) = cb.async(fallback = Some(Success(theDefaultValue))) { SimpleOperation.asyncOperation(x, y) } - (1 to 5).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 5).foreach { i => assertArithException(s"failure #$i") } // return the default value here, while the circuit is tripped - whenReady(protectedOperation(222, 1), timeout(Span(100, Millis))) { result => + whenReady(fallbackOperation(222, 1), timeout(Span(100, Millis))) { result => withClue(s"${cb.name} : after retry delay") { result shouldEqual theDefaultValue } @@ -474,24 +538,19 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } it should "throw a custom exception when configured as such and when the circuit breaker is tripped" in { - simpleBuilders(failLimit = 5).map(_.build()).foreach { cb => + simpleBuilders(failLimit = 5).map(_.build()).foreach { x => + implicit val cb: CircuitBreaker = x class CustomException extends Throwable val customFailure = Failure(new CustomException) // a bit of an odd behaviour for the fallback in this case, but tests the basic idea - def protectedOperation(x: Int, y: Int) = cb.async(fallback = Some(customFailure)) { + def fallbackOperationWithCustomException(x: Int, y: Int) = cb.async(fallback = Some(customFailure)) { SimpleOperation.asyncOperation(x, y) } - (1 to 5).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 5).foreach { i => assertArithException(s"failure #$i") } - whenReady(protectedOperation(2, 1).failed, timeout(Span(10, Millis))) { e => + whenReady(fallbackOperationWithCustomException(2, 1).failed, timeout(Span(10, Millis))) { e => withClue(s"${cb.name} : custom fallback failure") { e shouldBe a[CustomException] } @@ -505,32 +564,25 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { case i: Int if i == 2 => true // whenever the returned value is 2, the circuit breaker should mark as a failure }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x whenReady(Future(protectedOperation(2, 1), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still returns the value but records it as a failure") { result shouldEqual 2 } } whenReady(Future(protectedOperation(16, 8), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still returns the value but records it as a failure, trip CB") { result shouldEqual 2 } } // circuit breaker is now open - whenReady(Future(protectedOperation(3, 1)).failed, timeout(Span(10, Millis))) { e => - // the underlying function is never called, so the return value doesn't matter at this point - withClue(s"${cb.name} : custom fallback failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("custom fallback failure") } } @@ -539,12 +591,16 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { .map(_.withResultFailureCases { case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation() = cb() { 1 } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation() = cb() { + 1 + } // this should not blow up with an exception whenReady(Future(protectedOperation(), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still return result when failure function throws") { result shouldEqual 1 } @@ -558,30 +614,24 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { case i: Int if i == 2 => true // whenever the returned value is 2, the circuit breaker should mark as a failure }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x - whenReady(protectedOperation(2, 1), timeout(Span(100, Millis))) { result => + + whenReady(protectedAsyncOperation(2, 1), timeout(Span(100, Millis))) { result => withClue(s"${cb.name} : still returns the value but records it as a failure") { result shouldEqual 2 } } - whenReady(protectedOperation(16, 8), timeout(Span(100, Millis))) { result => + whenReady(protectedAsyncOperation(16, 8), timeout(Span(100, Millis))) { result => withClue(s"${cb.name} : still returns the value but records it as a failure, trip CB") { result shouldEqual 2 } } // circuit breaker is now open - whenReady(protectedOperation(3, 1).failed, timeout(Span(10, Millis))) { e => - // the underlying function is never called, so the return value doesn't matter at this point - withClue(s"${cb.name} : custom fallback failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("custom fallback failure", 10) } } @@ -591,8 +641,14 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the following is a terrible idea - don't do this; but if someone does, ignore and log case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation(): Future[Int] = cb.async() { Future { 1 } } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation(): Future[Int] = cb.async() { + Future { + 1 + } + } // this should not blow up with an exception whenReady(protectedOperation(), timeout(Span(100, Millis))) { result => @@ -606,30 +662,24 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { "circuit breaker configured to ignore certain exceptions" should "not be tripped when these exceptions occur" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb() { - SimpleOperation.operation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x // the circuit is never tripped and just lets these exceptions through - (1 to 10).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 10).foreach { i => assertArithException(s"failure #$i") } } } it should "not ignore exceptions that have not be configured to be ignored" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => + .foreach { x => + implicit val cb: CircuitBreaker = x + def protectedOperation(x: Int, y: Int) = cb() { val ret = SimpleOperation.operation(x, y) if (ret == 3) throw new IllegalStateException("just here to verify this exception is not ignored") @@ -637,13 +687,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } // the circuit is never tripped and just lets these exceptions through - (1 to 10).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : (non) failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 10).foreach { i => assertArithException(s"(non) failure #$i") } whenReady(Future(protectedOperation(3, 1)).failed, timeout(Span(100, Millis))) { e => withClue(s"${cb.name} : failure #1") { @@ -657,11 +701,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } // CB is now tripped - whenReady(Future(protectedOperation(2, 1)).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : tripped failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitException("tripped failure", 10) } } @@ -671,12 +711,16 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the following is a terrible idea - don't do this; but if someone does, ignore and log case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation() = cb() { 1 } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation() = cb() { + 1 + } // this should not blow up with an exception whenReady(Future(protectedOperation(), timeout(Span(100, Millis)))) { - case (result, timeout) => + case (result, _) => withClue(s"${cb.name} : still return result when non-failure function throws") { result shouldEqual 1 } @@ -687,30 +731,24 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { "async circuit breaker configured to ignore certain exceptions" should "not be tripped when these exceptions occur" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => - def protectedOperation(x: Int, y: Int) = cb.async() { - SimpleOperation.asyncOperation(x, y) - } + .foreach { x => + implicit val cb: CircuitBreaker = x // the circuit is never tripped and just lets these exceptions through - (1 to 10).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 10).foreach { i => assertArithExceptionAsync(s"failure #$i") } } } it should "not ignore exceptions that have not be configured to be ignored" in { simpleBuilders() .map(_.withNonFailureExceptionCases { - case e: ArithmeticException => true + case _: ArithmeticException => true }.build()) - .foreach { cb => + .foreach { x => + implicit val cb: CircuitBreaker = x + def protectedOperation(x: Int, y: Int) = cb.async() { Future { val ret = SimpleOperation.operation(x, y) @@ -721,13 +759,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } // the circuit is never tripped and just lets these exceptions through - (1 to 10).foreach { i => - whenReady(protectedOperation(1, 0).failed, timeout(Span(100, Millis))) { e => - withClue(s"${cb.name} : (non) failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 10).foreach { i => assertArithExceptionAsync(s"failure #$i") } whenReady(protectedOperation(3, 1).failed, timeout(Span(100, Millis))) { e => withClue(s"${cb.name} : failure #1") { @@ -741,11 +773,7 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } // CB is now tripped - whenReady(protectedOperation(2, 1).failed, timeout(Span(10, Millis))) { e => - withClue(s"${cb.name} : tripped failure") { - e shouldBe a[CircuitBreakerBrokenException] - } - } + assertCircuitExceptionAsync("tripped failure", 10) } } @@ -755,8 +783,12 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { // the following is a terrible idea - don't do this; but if someone does, ignore and log case _ => throw new Exception("a dumb thing to do") }.build()) - .foreach { cb => - def protectedOperation() = cb.async() { Future(1) } + .foreach { x => + implicit val cb: CircuitBreaker = x + + def protectedOperation() = cb.async() { + Future(1) + } // this should not blow up with an exception whenReady(protectedOperation(), timeout(Span(100, Millis))) { result => @@ -780,21 +812,13 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { override def onReset(resourceName: String): Unit = promiseIllReset.success(true) } - val circuitBreaker = CircuitBreakerBuilder("notifyStateChangeListeners", 2, defaultRetryDelay) - .withStateChangeListeners(List(myStateChangeListener)) - .build() + implicit val circuitBreaker: CircuitBreaker = + CircuitBreakerBuilder("notifyStateChangeListeners", 2, defaultRetryDelay) + .withStateChangeListeners(List(myStateChangeListener)) + .build() - def protectedOperation(x: Int, y: Int) = circuitBreaker() { - SimpleOperation.operation(x, y) - } - (1 to 2).foreach { i => - whenReady(Future(protectedOperation(1, 0)).failed, timeout(Span(100, Millis))) { e => - withClue(s"${circuitBreaker.name} : failure #$i") { - e shouldBe a[ArithmeticException] - } - } - } + (1 to 2).foreach { i => assertArithException(s"failure #$i") } // notified async / on separate thread of CB being *tripped*, wait a bit for it whenReady(promiseIllTrip.future, timeout(Span(50, Millis))) { tripped => @@ -851,18 +875,13 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { } } - val circuitBreaker = CircuitBreakerBuilder("notifyInvocationListeners", 2, defaultRetryDelay) - .withInvocationListeners(List(myInvocationListener)) - .build() - - def protectedOperation(x: Int, y: Int) = circuitBreaker() { - SimpleOperation.operation(x, y) - } + implicit val circuitBreaker: CircuitBreaker = + CircuitBreakerBuilder("notifyInvocationListeners", 2, defaultRetryDelay) + .withInvocationListeners(List(myInvocationListener)) + .build() // 10 invocations in flow state - (1 to 10).foreach { _ => - protectedOperation(1, 1) - } + (1 to 10).foreach { _ => protectedOperation(1, 1) } whenReady(promiseCheckpoint1.future, timeout(Span(50, Millis))) { case (invoked, broken) => @@ -936,15 +955,17 @@ class CircuitBreakerTest extends FlatSpec with Matchers with ScalaFutures { "circuit breaker exception checking" should "let ControlThrowable exceptions through without affecting the state of the circuit breaker" in { val circuitBreaker = CircuitBreakerBuilder("controlThrowable", 2, defaultRetryDelay).build() - val protectedOperation = circuitBreaker() { new ControlThrowable {} } + val protectedOperation = circuitBreaker() { + new ControlThrowable {} + } // the circuit breaker is never tripped (1 to 5).foreach { _ => try { protectedOperation } catch { - case e: CircuitBreakerBrokenException => fail("the circuit breaker should not have been tripped") - case e: ControlThrowable => //cool + case _: CircuitBreakerBrokenException => fail("the circuit breaker should not have been tripped") + case _: ControlThrowable => //cool case e: Throwable => throw e } }