Skip to content

Commit f4ba5bf

Browse files
committed
rx (feature): Add Rx.runOn for side-effect
1 parent 9217db0 commit f4ba5bf

File tree

3 files changed

+116
-6
lines changed

3 files changed

+116
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package wvlet.airframe.http.client
15+
16+
import wvlet.airframe.http.HttpMessage.Response
17+
import wvlet.airframe.http.{Http, HttpMessage, HttpStatus, RPCMethod, RPCStatus, RxHttpEndpoint, RxHttpFilter}
18+
import wvlet.airframe.rx.Rx
19+
import wvlet.airframe.surface.Surface
20+
import wvlet.airspec.AirSpec
21+
22+
import java.util.concurrent.atomic.AtomicInteger
23+
import scala.util.{Failure, Success}
24+
25+
object RPCErrorHandlingTest extends AirSpec {
26+
27+
private class DummyHttpChannel extends HttpChannel {
28+
private val requestCount = new AtomicInteger(0)
29+
30+
override def send(req: HttpMessage.Request, channelConfig: HttpChannelConfig): Response = ???
31+
override def sendAsync(req: HttpMessage.Request, channelConfig: HttpChannelConfig): Rx[Response] = {
32+
requestCount.getAndIncrement() match {
33+
case 0 =>
34+
// For retry test
35+
Rx.single(Http.response(HttpStatus.InternalServerError_500))
36+
case _ =>
37+
Rx.single(RPCStatus.INVALID_REQUEST_U1.newException("RPC error").toResponse)
38+
}
39+
}
40+
override def close(): Unit = {}
41+
}
42+
43+
initDesign {
44+
_.bind[AsyncClient].toInstance {
45+
val config = Http.client // .withClientFilter(null)
46+
new AsyncClientImpl(new DummyHttpChannel, config)
47+
}
48+
}
49+
50+
test("test error response") { (client: AsyncClient) =>
51+
val dummyRPCMerthod =
52+
RPCMethod("/rpc_method", "demo.RPCClass", "hello", Surface.of[Map[String, Any]], Surface.of[String])
53+
54+
var observedRetry = false
55+
val myClient = client.withClientFilter(new RxHttpFilter {
56+
override def apply(request: HttpMessage.Request, next: RxHttpEndpoint): Rx[Response] = {
57+
next(request).runOn {
58+
case Success(x) =>
59+
x.status match {
60+
case HttpStatus.InternalServerError_500 =>
61+
observedRetry = true
62+
case _ =>
63+
}
64+
case Failure(e) =>
65+
logger.info(e)
66+
}
67+
}
68+
})
69+
70+
myClient.rpc[Map[String, Any], String](dummyRPCMerthod, Map("message" -> "world")).transform {
71+
case scala.util.Failure(e) =>
72+
observedRetry shouldBe true
73+
e.getMessage shouldContain "RPC error"
74+
case _ =>
75+
fail("should fail")
76+
}
77+
}
78+
}

airframe-rx/src/main/scala/wvlet/airframe/rx/Rx.scala

+23-4
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
package wvlet.airframe.rx
1515

1616
import java.util.concurrent.TimeUnit
17-
import wvlet.airframe.rx.Rx.{RecoverOp, RecoverWithOp}
1817
import wvlet.log.LogSupport
19-
2018
import scala.concurrent.{ExecutionContext, Future}
2119
import scala.language.higherKinds
2220
import scala.util.{Failure, Success, Try}
@@ -26,19 +24,39 @@ import scala.util.{Failure, Success, Try}
2624
* @tparam A
2725
*/
2826
trait RxOps[+A] { self =>
27+
import Rx._
28+
2929
def parents: Seq[RxOps[_]]
3030

3131
def toRx: Rx[A]
3232

3333
/**
3434
* Recover from a known error and emit a replacement value
3535
*/
36-
def recover[U](f: PartialFunction[Throwable, U]): Rx[U] = RecoverOp(this.toRx, f)
36+
def recover[U](f: PartialFunction[Throwable, U]): Rx[U] = RecoverOp(this, f)
3737

3838
/**
3939
* Recover from a known error and emit replacement values from a given Rx
4040
*/
41-
def recoverWith[A](f: PartialFunction[Throwable, RxOps[A]]): Rx[A] = RecoverWithOp(this.toRx, f)
41+
def recoverWith[A](f: PartialFunction[Throwable, RxOps[A]]): Rx[A] = RecoverWithOp(this, f)
42+
43+
/**
44+
* A utility method for running the given effect function when the Rx value is available and bypass the original
45+
* input value as is.
46+
*
47+
* This method is useful for debugging Rx chains. For example:
48+
* {{{
49+
* rx.runOn {
50+
* case Success(v) => debug(s"received ${v}")
51+
* case Failure(e) => error(s"request failed", e)
52+
* }
53+
* }}}
54+
*
55+
* @param f
56+
* @return
57+
* the original input value as is
58+
*/
59+
def runOn(f: PartialFunction[Try[A], Unit]): Rx[A] = RunOnOp(this, f)
4260

4361
/**
4462
* Evaluate this Rx[A] and apply the given effect function. Once OnError(e) or OnCompletion is observed, it will stop
@@ -416,6 +434,7 @@ object Rx extends LogSupport {
416434
}
417435
case class RecoverOp[A, U](input: RxOps[A], f: PartialFunction[Throwable, U]) extends UnaryRx[A, U]
418436
case class RecoverWithOp[A, U](input: RxOps[A], f: PartialFunction[Throwable, RxOps[U]]) extends UnaryRx[A, U]
437+
case class RunOnOp[A](input: RxOps[A], f: PartialFunction[Try[A], Unit]) extends UnaryRx[A, A]
419438

420439
case class IntervalOp(interval: Long, unit: TimeUnit) extends Rx[Long] {
421440
override def parents: Seq[RxOps[_]] = Seq.empty

airframe-rx/src/main/scala/wvlet/airframe/rx/RxRunner.scala

+15-2
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@ package wvlet.airframe.rx
22

33
import java.util.concurrent.TimeUnit
44
import java.util.concurrent.atomic.AtomicBoolean
5-
65
import wvlet.log.LogSupport
76

87
import scala.annotation.tailrec
98
import scala.collection.immutable.Queue
109
import scala.util.{Failure, Success, Try}
11-
import Rx._
1210

1311
/**
1412
* States for propagating the result of the downstream operators.
@@ -75,6 +73,8 @@ class RxRunner(
7573
continuous: Boolean
7674
) extends LogSupport { runner =>
7775

76+
import Rx._
77+
7878
/**
7979
* Build an executable chain of Rx operators. The resulting chain will be registered as a subscriber to the root node
8080
* (see RxVar.foreach). If the root value changes, the effect code block will be executed.
@@ -459,6 +459,19 @@ class RxRunner(
459459
Cancelable { () =>
460460
c1.cancel; c2.cancel
461461
}
462+
case RunOnOp(in, f) =>
463+
val f0 = f.asInstanceOf[PartialFunction[Try[_], Unit]]
464+
run(in) { ev =>
465+
ev match {
466+
case OnNext(v) =>
467+
// Skip the error handling so as not to discard the input event
468+
Try(f0.applyOrElse(Success(v), (_: Try[_]) => ()))
469+
case OnError(e) =>
470+
Try(f0.applyOrElse(Failure(e), (_: Try[_]) => ()))
471+
case _ =>
472+
}
473+
effect(ev)
474+
}
462475
case SingleOp(v) =>
463476
Try(effect(OnNext(v.eval))) match {
464477
case Success(c) =>

0 commit comments

Comments
 (0)