Skip to content

imrafaelmerino/vertx-effect

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

logo

Maven

vertx-effect manifesto

. The more verticles, the better.
. A verticle must do only one thing.
. Use persistent data structures.
. Systems will fail, be prepared.
. Simplicity matters.
. If there is a bug and you can't spot it quickly, then there are two bugs. Fix both of them.

How persistent data structures makes a difference working with actors

Vertx-effect embraces immutability and persistent data structures. That's why it uses vertx-values instead of the default Vert.x Json which is very inefficient.

vertx-effect in a few lines of code

public class MyModule extends VertxModule {

    public static Lambda<String, String> toLowerCase, toUpperCase;
    public static Lambda<Integer, Integer> inc;
    public static Lambda<JsObj, JsObj> validate, validateAndMap;

    @Override
    protected void deploy() {

        this.deploy("toLowerCase",
                    (String str) -> VIO.succeed(str.toLowerCase())
                   );
        this.deploy("toUpperCase",
                    (String str) -> VIO.succeed(str.toUpperCase())
                   );
        this.deploy("inc",
                    (Integer n) -> VIO.succeed(n + 1)
                   );

        // json-values uses specs to define the structure of a Json: {a:int,b:[str,str]} 
        JsObjSpec spec = JsObjSpec.of("a", integer(),
                                      "b", tuple(str(), str())
                                     );
        this.deploy("validate", Validators.validateJsObj(spec));

        Lambda<JsObj, JsObj> map = obj ->
                JsObjExp.par("a",
                             inc.apply(obj.getInt("a"))
                                .map(JsInt::of),
                             "b",
                             JsArrayExp.par(toLowerCase.apply(obj.getStr(path("/b/0")))
                                                       .map(JsStr::of),
                                            toUpperCase.apply(obj.getStr(path("/b/1")))
                                                       .map(JsStr::of)
                                           )
                            )
                        .retry(RetryPolicies.limitRetries(2));
        this.deploy("validateAnMap",
                    (JsObj obj) -> validate.apply(obj).then(map)
                   );

    }

    @Override
    protected void initialize() {

        toUpperCase = this.ask("toUpperCase");
        toLowerCase = this.ask("toLowerCase");
        inc = this.ask("inc");
        validate = this.ask("validate");
        validateAndMap = this.ask("validateAnMap");

    }
}

A module is a regular verticle that deploys other verticles and exposes lambdas to communicate with them. A lambda is just a function that takes an input and produces an output. In the above example, MyModule deploys five verticles. It's worth mentioning how the verticle ValidateAndMap is defined using composition and the expressions JsObjExp and JsArrayExp. It shows the essence and the goal of vertx-effect. Later on, we'll see more expressions like CondExp, SwitchExp, IfElseExp, AllExp, AnyExp, PairExp, TripleExp, ListExp, MapExp etc.

ValidateAndMap sends a message to validate. If the message matches the given spec, ValidateAndMap computes the output sending messages to the verticles inc, toLowerCase, and toUpperCase and composing a Json from their responses in parallel. You can operate sequentially instead of in parallel using the constructors JsObjExp.seq and JsArrayExp.sequential. Thanks to the retry function, if any verticle failed to compute their value, it would retry the computation up to two times.

It's important to notice that you can still send messages to the module verticles using the Vertx API, but one of the points of vertx-effect is to use functions for that.

Let's write some tests. Vertx doesn't support json-values, so we need to register a MessageCodec to send its persistent Json across the event bus.

import io.vertx.core.Vertx;
import io.vertx.junit5.*;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import jsonvalues.JsArray;
import jsonvalues.JsInt;
import jsonvalues.JsObj;
import vertx.effect.PairExp;
import vertx.effect.VertxRef;
import vertx.values.codecs.RegisterJsValuesCodecs;

@ExtendWith(VertxExtension.class)
public class TestMyModule {

    @BeforeAll
    // register a MessageCodec for json-values and deploy MyModule
    public static void prepare(final Vertx vertx,
                               final VertxTestContext context
                              ) {
        VertxRef ref = new VertxRef(vertx);
        PairExp.seq(ref.deployVerticle(new RegisterJsValuesCodecs()),
                    ref.deployVerticle(new MyModule())
                   )
               .onSuccess(ids -> context.completeNow())
               .get();
    }

    @Test
    public void empty_json_is_sent_and_failure_is_received(VertxTestContext context) {

        MyModule.validateAndMap.apply(JsObj.EMPTY)
                               .onComplete(result ->
                                                   context.verify(() -> {
                                                       Assertions.assertTrue(result.failed());
                                                       System.out.println(result.cause());
                                                       context.completeNow();
                                                   })
                                          )
                               .get();
    }

    @Test
    public void valid_json_is_sent_and_is_mapped_successfully(VertxTestContext context) {

        JsObj input = JsObj.of("a", JsInt.of(1), "b", JsArray.of("FOO", "foo"));

        JsObj expected = JsObj.of("a", JsInt.of(2), "b", JsArray.of("foo", "FOO"));

        MyModule.validateAndMap.apply(input)
                               .onSuccess(output -> {
                                   context.verify(() -> {
                                       Assertions.assertEquals(expected, output);
                                       context.completeNow();
                                   });
                               })
                               .get();
    }
}

Lambdas are just functions, so you can test them without deploying any verticle!


 Lambda<String, String> toLowerCase = str -> VIO.succeed(str.toLowerCase());

 Lambda<String, String> toUpperCase = str -> VIO.succeed(str.toUpperCase());

 Lambda<Integer, Integer> inc = n -> VIO.succeed(n+1);

 JsObjSpec spec = JsObjSpec.of("a", integer(), "b", tuple(str(), str()));

 Lambda<JsObj, JsObj> validate = Validators.validateJsObj(spec);

 Lambda<JsObj, JsObj> map = obj->
            JsObjExp.par("a", inc.apply(obj.getInt("a")).map(JsInt::of),
                         "b", JsArrayExp.par(toLowerCase.apply(obj.getStr(path("/b/0"))).map(JsStr::of),
                                             toUpperCase.apply(obj.getStr(path("/b/1"))).map(JsStr::of)
                                            )
                        );
 
 @Test
 public void valid_json_is_validated_and_mapped(VertxTestContext context) {

    JsObj input = JsObj.of("a", JsInt.of(1), "b", JsArray.of("FOO","foo"));
        
    JsObj expected = JsObj.of("a", JsInt.of(2), "b", JsArray.of("foo","FOO"));

    validate.then(map)
            .apply(input).get()
            .onComplete(result->{
                    context.verify(
                        () -> Assertions.assertTrue(result.succeeded() && result.result().equals(expected))
                    );
                    context.completeNow();
                });
    }

This is extremely convenient and productive to do your testing. You don't need to mock anything. Passing around functions that produce outputs given some inputs is enough to check that your verticles will do their job.

The takeaway from this section is how using function composition and different expressions, you'll be able to handle complexity and implement and test any imaginable flow very quickly.

Effects

Functional Programming is all about working with pure functions and values. That's all. One of the points where FP especially shines is dealing with effects. An effect is something you can't call twice unless you intended to:


Future<Customer> a = insertDb(customer);

Future<Customer> b = insertDb(customer);

Both calls can fail, or they can create two different customers, or one of them can fail, who knows. That code is not referentially transparent. For obvious reasons, you can't do the following refactoring:


Future<Customer> c = insertDb(customer)

Future<Customer> a = c;

Future<Customer> b = c;

A Vertx future represents an asynchronous effect. We don't want to block the event loop because of the latency of a computation. Haskell has proven to us how laziness is an essential property to stay pure. We need to define an immutable and lazy data structure that allows us to control the effect of latency.

Since Java 8, we have suppliers. They are indispensable to do FP in Java. Let's start defining what an effect is in vertx-effect:


import java.util.function.Supplier
import io.vertx.core.Future

public abstract class VIO<O> extends Supplier<Future<O>> {
    //from a constant
    public static VIO<O> succeed(O constant);
    
    //from a exception
    public static VIO<O> fail(Throwable error);
    
    // from an effect
    public static VIO<O> effect(Supplier<Future<O>> effect);
    
}

A VIO of type O is a supplier that will return a Vertx future of type O. It describes (and not execute) an asynchronous effect that will compute a value of type O.

If we turn Future into VIO in the previous example:


VIO<Customer> a = VIO.effect( () -> insertDb(customer) );

VIO<Customer> b = VIO.effect( () -> insertDb(customer) );

The above example is entirely equivalent to:


VIO<Customer> c = VIO.effect( ()->insertDb(customer) ); 

VIO<Customer> a = c;

VIO<Customer> b = c;

This property is fundamental. Whenever you see the expression insertDb(customer) in your program, you can think of it as it was c. Pure FP programming helps us reason about the programs we write. VIO is lazy. It's a description of an effect. In FP, we describe programs, and it's at the very last moment when they're executed.

I always wanted to name Lambda to something, and I finally got the chance!


import java.util.function.Function

public interface Lambda<I,O> extends Function<I, VIO<O>> { }

A lambda is a function that returns a VIO of type O given a type I. It models the communication with a verticle: a message is sent, the verticle receives and processes the message, and replies with a response. The message and the answer have to be of a type that can be sent across the event bus; otherwise, you must implement a MessageCodec.

Expressions

Using expressions and function composition is how we deal with complexity in functional programming. Let's go over the essential expressions in vertx-effect:

  • VIO constructors
VIO<String> str = VIO.succeed("hi"); 

VIO<Throwable> error = VIO.fail(new RuntimeException("something went wrong :("));

Future<JsObj> getProfile(final String id){...}
VIO<JsObj> profile = VIO.effect( () -> getProfile(id)); 

VIO<Long> realTime = VIO.effect(() -> System.currentTimeMillis() );
  • IfElseExp. If the condition is evaluated to true, it computes and returns the consequence; otherwise, the alternative.

import jio.IfElseExp;

 VIO<O> exp = IfElseExp.<O>predicate(VIO<Boolean> condition)
                       .consequence(Supplier<VIO<O>> consequence)
                       .alternative(Supplier<VIO<O>> alternative);

 VIO<O> exp = IfElseExp.<O>predicate(boolean condition)
                       .consequence(Supplier<VIO<O>> consequence)
                       .alternative(Supplier<VIO<O>> alternative);

The alternative and the consequence are lazy computations of IO effects.

  • SwitchExp. The switch construct implements multiple pattern-value branches. It evaluates an effect or value of type I and allows multiple clauses based on evaluating that value.

// matches a value of type I

 VIO<O> exp = 
  SwitchExp<I,O>.match(I value)
                .patterns(I pattern1, Lambda<I,O> lambda1,
                          I pattern2, Lambda<I,O> lambda2,           
                          I pattern3, Lambda<I,O> lambda3,
                          Lambda<I,O> otherwise
                          );      

// matches an effect of type I
                          
 VIO<O> exp = 
  SwitchExp<I,O>.match(VIO<I> value)
                .patterns(I pattern1, Lambda<I,O> lambda1,
                          I pattern2, Lambda<I,O> lambda2,           
                          I pattern3, Lambda<I,O> lambda3,
                          Lambda<I,O> otherwise
                          );                               
                          

// For example, the following expression reduces to "Wednesday"

 VIO<O> exp = 
  SwitchExp<Integer,String>.match(3)
                           .patterns(1, _ -> VIO.succeed("Monday"),
                                     2, _ -> VIO.succeed("Tuesday"),
                                     3, _ -> VIO.succeed("Wednesday"),
                                     4, _ -> VIO.succeed("Thursday"),
                                     5, _ -> VIO.succeed("Friday"),
                                     _ -> VIO.succeed("weekend")
                                     );

The same as before but using lists instead of constants as patterns.


 VIO<O> exp = SwitchExp<I,O>.match(I value)
                          .patterns(List<I> pattern1, Lambda<I,O> lambda1,
                                    List<I> pattern2, Lambda<I,O> lambda2,        
                                    List<I> pattern3, Lambda<I,O> lambda3,
                                    Lambda<I,O> otherwise
                                   );      
        
// For example, the following expression reduces to "third week"
 VIO<O> exp = 
  SwitchExp<Integer,String>.match(20)
                           .patterns(List.of(1, 2, 3, 4, 5, 6, 7), _ -> VIO.succeed("first week"),
                                     List.of(8, 9, 10, 11, 12, 13, 14), _ -> VIO.succeed("second week"),
                                     List.of(15, 16, 17, 18, 19, 20, 10), _ -> VIO.succeed("third week"),
                                     List.of(21, 12, 23, 24, 25, 26, 27), _ -> VIO.succeed("forth week"),
                                     _ -> VIO.succeed("last days of the month")
                                    );

Last but not least, you can use predicates as patterns instead of values or list of values:


 VIO<O> exp = 
  SwitchExp<I,O>.match(VIO<I> value)
                .patterns(Predicate<I> pattern1, Lambda<I,O> lambda1,
                          Predicate<I> pattern2, Lambda<I,O> lambda2,        
                          Predicate<I> pattern3, Lambda<I,O> lambda3,
                          Lambda<I,O> otherwise
                          );      
        
// For example, the following expression reduces to the default value, "greater or equal to twenty"

 VIO<O> exp = 
  SwitchExp<Integer,String>.match(IO.succeed(20))
                           .patterns(i -> i < 5 , _ -> VIO.succeed("lower than five"),
                                     i -> i < 10 , _ -> VIO.succeed("lower than ten"),
                                     i -> i < 20 , _ -> VIO.succeed("lower than twenty"),
                                     _ -> VIO.succeed("greater or equal to twenty")
                                    );
  • CondExp. It's a set of branches, and a default value. Each branch consists of an effect that computes a boolean (the condition) and its associated effect. The effect is computed and the expression reduced to its value if its condition is the first one in the list to be true. This means the order you place branches matters. If no condition is true, it computes the default effect, which is the last clause. You can compute all the conditions values either in parallel or sequentially.

 VIO<O> exp = CondExp.<O>seq(VIO<Boolean> cond1, Supplier<VIO<O>> effect1,
                             VIO<Boolean> cond2, Supplier<VIO<O>> effect2,
                             VIO<Boolean> cond3, Supplier<VIO<O>> effect3,
                             Supplier<VIO<O>> otherwise
                          );
                          
                          
 VIO<O> exp = CondExp.<O>par(VIO<Boolean> cond1, Supplier<VIO<O>> effect1,
                             VIO<Boolean> cond2, Supplier<VIO<O>> effect2,
                             VIO<Boolean> cond3, Supplier<VIO<O>> effect3,
                             Supplier<VIO<O>> otherwise
                          );                          
                        
  • AllExp and AnyExp. They are just idiomatic names for the boolean expressions And and Or. You can compute all the boolean effects either in parallel or sequentially.

 VIO<Boolean> all = AllExp.par(VIO<Boolean> cond1, VIO<Boolean> cond2, ....);
 VIO<Boolean> all = AllExp.seq(VIO<Boolean> cond1, VIO<Boolean> cond2, ....);

 VIO<Boolean> any = AnyExp.par(VIO<Boolean> cond1, VIO<Boolean> cond2, ...);
 VIO<Boolean> any = AnyExp.seq(VIO<Boolean> cond1, VIO<Boolean> cond2, ...);

  • PairExp. A pair is a tuple of two elements. Each element can be computed either in parallel or sequentially.

 VIO<Pair<A,B> pair = PairExp.par(VIO<A> val1, VIO<B> val2);

 VIO<Pair<A,B> pair = PairExp.seq(VIO<A> val1, VIO<B> val2);

You can race pairs when A and B have the same type, returning the first value that is computed:


 VIO<Pair<A,A> pair = PairExp.par(VIO<A> val1, VIO<A> val2);

 VIO<A> thefastest = PairExp.race(par);

  • TripleExp. A triple is a tuple of three elements. Each element can be computed either in parallel or sequentially.

 VIO<Triple<A,B,C> triple = TripleExp.par(VIO<A> val1, VIO<B> val2, VIO<C> val3);

 VIO<Triple<A,B,C> triple = TripleExp.seq(VIO<A> val1, VIO<B> val2, VIO<C> val3);

You can also race triples.

  • JsObjExp and JsArrayExp.

JsObjExp and JsArrayExp are data structures that look like raw Json. You can compute all the values either in parallel or sequentially. You can mix all the expressions we've seen so far and nest them, going as deep as necessary, like in the following example:


IfElseExp<JsStr> a = IfElseExp.<JsStr>predicate(VIO<Boolean> condition)
                                     .consequence(VIO<JsStr> consequence)
                                     .alternative(VIO<JsStr> alternative); 

JsArrayExp b = 
        JsArrayExp.seq(SwitchExp<Integer,JsValue>.match(n)
                                                 .patthers(1, Lambda<Insteger,JsValue> lambda1,
                                                           2, Lambda<Insteger,JsValue> lambda2,
                                                           Lambda<Insteger,JsValue> defaultLambda
                                                          ),
                      CondExp.par(VIO<Boolean> cond1, Supplier<IO<JsValue>> effect1,
                                  VIO<Boolean> cond2, Supplier<IO<JsValue>> effect2,
                                  Supplier<IO<JsValue>> defaultValue
                                )
                      );

JsObjExp c = 
       JsObjExp.par("d", AnyExp.seq(VIO<Boolean> cond3, VIO<Boolean> cond4)
                               .map(JsBool::of),
                    "e", AllExp.par(VIO<Boolean> cond5, VIO<Boolean> cond6)
                               .map(JsBool::of),
                    "f", JsArrayExp.par(VIO<JsValue> value1, VIO<JsValue> value2) 
                   )

JsObjExp exp = JsObjExp.par("a", a,
                            "b", b,
                            "c", c 
                           );
                           
JsObj json = exp.result();                          

It's important to notice that any value of the above expressions can be computed by different verticles deployed on different machine's of a cluster. Imagine ten machines collaborating to compute a JsObj. Isn't this amazing?

  • ListExp and MapExp

They represent sequences and maps. Modules use them internally. For example, the deploy method uses a MapExp to put the deployed verticles using their addresses as keys. They also use a ListExp when more than a verticle instance is deployed. As with the other expressions, you can compute their values either in parallel or sequentially.


MapExp<String> map = MapExp.par("a", VIO<String> value1,
                                "b", VIO<String> value2,
                                "c", VIO<String> value3
                                );

ListExp<Integer> seq = ListExp.par(VIO<Integer>, VIO<Integer>);
VIO<Integer> firstFinishing = seq.race();

The race function returns the value that finishes first. You can race a JsArrayExp as well.

Being reactive

Find below some of the most critical operations defined in the Val interface that will help us make our code more resilient:

import vertx.effect.RetryPolicy;

public interface VIO<O> extends Supplier<Future<O>> {
    VIO<O> retry(RetryPolicy policy);

    VIO<O> retry(Predicate<Throwable>,
                 RetryPolicy policy);

    VIO<O> repeat(Predicate<O> predicate,
                  RetryPolicy policy);

    VIO<O> recoverWith(Lambda<Throwable, O> fn);

    VIO<O> fallbackTo(Lambda<Throwable, O> fn);

    VIO<O> recoverWith(Lambda<Throwable, O> fn);
}

recoverWith: it switches to an alternative lambda when a failure happens.

fallbackTo: It's like recoverWith, but if the second lambda fails too, it returns the first one error.

recover: returns a constant if the computation fails.

retry: retries the computation if an error happens. You can define a predicate to retry only the specified errors. Retry policies are created in a very declarative and composable way, for example:

import static vertx.effect.RetryPolicies.*

Delay oneHundredMillis = vertxRef.sleep(Duration.ofMillis(100));
Delay oneSec = vertxRef.sleep(Duration.ofSeconds(1));

// up to five retries waiting 100 ms 
constantDelay(oneHundredMillis).append(limitRetries(5))

//during 3 seconds up to 10 times     
limitRetries(10).limitRetriesByCumulativeDelay(Duration.ofSeconds(3))    

//5 times without timer and then, if it keeps failing, an incremental timer from 100 ms up to 1 second
limiteRetries(5).followedBy(incrementalDelay(oneHundredMillis).capDelay(oneSec))

There are very interesting policies implemented based on this article: exponential backoff, full jitter, equal jitter, decorrelated jitter etc

repeat: When you get a not expected value (a failure) and want to repeat the computation. A predicate is specified to catch the failures. You can define any imaginable policy as well. Imagine you make a http request and you get a 500. That's not an error, it's a server failure. You can repeat the request according to a policy.

For expressions like Cond, Case, IfElse, All, Any, Pair, Triple, you can retry each value of the expression instead of the overall expresion with the methods:

 VIO<O> retryEach(RetryPolicy policy);

 VIO<O> retryEach(Predicate<Throwable>,
                  RetryPolicy policy);

Modules

In vertx-effect, a module is a special verticle whose purpose is to deploy other verticles and expose lambdas to communicate with them. Let's put an example.

import jsonvalues.JsObj;
import jsonvalues.JsStr;
import vertx.effect.VIO;
import vertx.effect.VertxModule;
import vertx.effect.Lambda;

public class MyModule extends VertxModule {

    private static final String REMOVE_NULL_ADDRESS = "removeNull";
    private static final String TRIM_ADDRESS = "trim";

    public static Lambda<JsObj, JsObj> removeNull;
    public static Lambda<JsObj, JsObj> trim;

    @Override
    public void deploy() {

        this.deploy(REMOVE_NULL_ADDRESS,
                    (JsObj o) -> VIO.succeed(o.filterAllValues(pair -> pair.value.isNotNull()))
                   );

        Function<JsValue, JsValue> trim = JsStr.prism.modify.apply(String::trim);
        this.deploy(TRIM_ADDRESS,
                    (JsObj o) -> VIO.succeed(o.mapAllValues(pair -> trim.apply(pair.value)))
                   );
    }

    @Override
    protected void initialize() {
        removeNull = this.ask(REMOVE_NULL_ADDRESS);
        trim = this.ask(TRIM_ADDRESS);
    }
}

We usually divide modules into four main blocks:

. The addresses where the module verticles will be listening on.
. The lambdas that are exposed to the outside world to communicate with the deployed verticles.
. The `deploy` method, where the module deploys the verticles.
. The `initialize` method, where the module initializes the lambdas.

In our example, we are using the persistent and immutable Json from json-values. The ask method returns a lambda to establish bidirectional communication with a verticle. In contrast, the tell method would return a consumer because a response is either not expected or ignored. Let's deploy our module and do some testing. We usually divide modules into four main blocks:

 @BeforeAll
 public static void prepare(final Vertx vertx,
                            final VertxTestContext context) 
 {
    VertxRef vertxRef = new VertxRef(vertx);

    // prints out events published by vertx-effect
    vertxRef.registerConsumer(EVENTS_ADDRESS, System.out::println); 

    Pair.seq(vertxRef.deployVerticle(new RegisterJsValuesCodecs()),
                    vertxRef.deployVerticle(new MyModule()) 
                   )
        .onSuccess(pair -> {
                            System.out.println(String.format("Ids deployed: %s and %s",
                                                             pair._1,
                                                             pair._2
                                                            )
                                              );
                            context.completeNow();
                           }
                  )
        .get();
 }

 @Test
 public void test_remove_and_then_trim(final VertxTestContext context)
 {
    Lambda<JsObj, JsObj> removeAndTrim = MyModule.removeNull.andThen(MyModule.trim);

    JsObj input = JsObj.of("a", JsStr.of("  hi  "),
                           "b", JsNull.NULL,
                           "c", JsObj.of("d", JsStr.of("  bye  "),
                                         "e", JsNull.NULL
                                        )
                          );

    JsObj expected = JsObj.of("a", JsStr.of("hi"),
                              "c", JsObj.of("d", JsStr.of("bye"))
                             );

    removeAndTrim.apply(input)
                 .onSuccess(it -> {
                     context.verify(()-> {
                                          Assertions.assertEquals(expected,it);
                                          context.completeNow();
                                         }
                                   );
                                  }
                           )
                .get();
 }

To send the persistent objects from json-values across the event bus, we need to register some codecs. The verticle RegisterJsValuesCodecs does this task. The VertxRef class is a wrapper around the Vertx instance to deploy and spawn verticles from lambdas. Modules use this class internally.

The VertxRef class is a wrapper around the Vertx instance to deploy and spawn verticles from lambdas. Modules use this class internally.

Logging

Logging is essential in software. There are many logging libraries. Sometimes it is not clear what dependencies you have to use because there isn't a standard solution. Each library uses its own. I didn't want to be opinionated. At the same time, I wanted to provide a simple and decouple solution to know what is going on in any system using vertx-effect. That's why I decided to publish remarkable events in a specific address. If you want to use your favorite slf4j implementation, just implement it in a consumer. On the other hand, consuming all those events during testing will give you instant feedback on your system and agility spotting bugs. You can disable this future with the Java system property -D"vertx.effect.enable.log.events"=false.

Publishing events

vertx-effect publishes events to the address vertx-effect-events. Find below some of the most important predefined events:

- VERTICLE_DEPLOYED
- VERTICLE_UNDEPLOYED
- MESSAGE_SENT
- MESSAGE_RECEIVED
- RESPONSE_REPLIED
- FAILURE_REPLIED
- RESPONSE_RECEIVED
- FAILURE_RECEIVED
- EXCEPTION_STARTING_VERTICLE
- EXCEPTION_STARTING_SHELL
- EXCEPTION_PROCESSING_MESSAGE
- EXCEPTION_UNDEPLOYING_VERTICLE
- EXCEPTION_DEPLOYING_VERTICLE
- TIMER_STARTED
- TIMER_ENDED

An example from the previous example would be:


{"event":"VERTICLE_DEPLOYED","address":"removeNull","instant":"2020-10-10T22:44:42.687633Z","id":"3de92ef8-777f-4110-aa45-442fc41900c6","thread":"vert.x-eventloop-thread-1"}
{"event":"VERTICLE_DEPLOYED","class":"vertx.effect.RegisterJsValuesCodecs","instant":"2020-10-10T22:44:42.682624Z","id":"73181043-ae38-4819-b7de-02f303fcc155","thread":"vert.x-eventloop-thread-3"}
{"event":"VERTICLE_DEPLOYED","address":"trim","instant":"2020-10-10T22:44:42.701293Z","id":"a866ffdc-38c8-4da2-bcc0-c9f4881f5139","thread":"vert.x-eventloop-thread-1"}
{"event":"VERTICLE_DEPLOYED","class":"vertx.effect.MyModule","instant":"2020-10-10T22:44:42.703410Z","id":"1473dff2-075c-4fd8-be42-cebcf0a890a0","thread":"vert.x-eventloop-thread-6"}

{"event":"MESSAGE_SENT","to":"removeNull","message":{"a":"  hi  ","b":null,"c":{"d":"  bye  ","e":null}},"instant":"2020-10-10T22:44:42.710447Z","thread":"main"}
{"event":"MESSAGE_RECEIVED","address":"removeNull","instant":"2020-10-10T22:44:42.713981Z","thread":"vert.x-eventloop-thread-4"}
{"event":"RESPONSE_REPLIED","address":"removeNull","message":{"c":{"d":"  bye  "},"a":"  hi  "},"instant":"2020-10-10T22:44:42.723013Z","thread":"vert.x-eventloop-thread-4"}
{"event":"RESPONSE_RECEIVED","from":"removeNull","instant":"2020-10-10T22:44:42.723225Z","thread":"vert.x-eventloop-thread-8"}

{"event":"MESSAGE_SENT","to":"trim","message":{"c":{"d":"  bye  "},"a":"  hi  "},"instant":"2020-10-10T22:44:42.723635Z","thread":"vert.x-eventloop-thread-8"}
{"event":"MESSAGE_RECEIVED","address":"trim","instant":"2020-10-10T22:44:42.724047Z","thread":"vert.x-eventloop-thread-5"}
{"event":"RESPONSE_REPLIED","address":"trim","message":{"a":"hi","c":{"d":"bye"}},"instant":"2020-10-10T22:44:42.728636Z","thread":"vert.x-eventloop-thread-5"}
{"event":"RESPONSE_RECEIVED","from":"trim","instant":"2020-10-10T22:44:42.728902Z","thread":"vert.x-eventloop-thread-8"}

Publishing correlated events

In async event-driven systems is extremely difficult to correlate events. Having this solved is a killer future that saves you from working hours trying to gather all the different events associated with a specific transaction. In vertx-effect is really easy! As always, functions and composition come to the rescue. Before checking out an example, let's see what a Lambdac is:

import io.vertx.core.MultiMap;

public interface Lambdac<I, O> extends BiFunction<MultiMap, I, VIO<O>> {}

A Lambdac is a function that takes two arguments, a map representing the context in which an operation will be executed, and the message of type I sent to the verticle across the event bus. You can put the user's email into the context to filter all the events associated with that email and a random value to distinguish between transactions from the same email. That's only an example.

public class UserAccountModule extends VertxModule {

    public static Lambdac<Integer, Boolean> isLegalAge;
    public static Lambdac<String, Boolean> isValidId;
    public static Lambdac<String, Boolean> isValidEmail;
    public static Lambdac<JsObj, Boolean> isValid;

    private static final String IS_VALID_ID = "isValidId";
    private static final String IS_LEGAL_AGE = "isLegalAge";
    private static final String IS_VALID_EMAIL = "isValidEmail";
    private static final String IS_VALID = "isValid";


    @Override
    protected void deploy() {

        this.deploy(IS_LEGAL_AGE, (Integer age) -> VIO.succeed(age > 16));

        this.deploy(IS_VALID_ID, (String id) -> VIO.succeed(!id.isEmpty()));

        this.deploy(IS_VALID_EMAIL, (String email) -> VIO.succeed(!email.isEmpty()));

        Lambdac<JsObj, Boolean> isValid = (context, obj) ->
                AllExp.par(isLegalAge.apply(context, obj.getInt("age")),
                           isValidId.apply(context, obj.getStr("id")),
                           isValidEmail.apply(context, obj.getStr("email"))
                          );
        this.deploy(IS_VALID, isValid);
    }

    @Override
    protected void initialize() {

        isLegalAge = this.trace(IS_LEGAL_AGE);

        isValidId = this.trace(IS_VALID_ID);

        isValidEmail = this.trace(IS_VALID_EMAIL);

        isValid = this.trace(IS_VALID);
    }
}

As you can see, we've implemented a module that deploys five verticles and exposes five Lambdac to interact with them. The method trace returns a Lambdac (in the previous example, we used the ask method that returns a Lambda). The isValid lambda is implemented using the AllExp expression. The context is passed through all the lambdas of the AllExp expression.


Function<JsObj, MultiMap> context=
        user -> MultiMap.caseInsensitiveMultiMap()
                        .add("email",user.getStr("email"));

        JsObj user = JsObj.of("email",JsStr.of("[email protected]"),
                              "age",JsInt.of(17),
                              "id",JsStr.of("03786761")
                              );

        JsObj user1=JsObj.of("email",JsStr.of("[email protected]"),
                             "age",JsInt.of(10),
                             "id",JsStr.of("03486761")
                            );

        UserAccountModule.isValid
                         .apply(context.apply(user),
                                user)
                         .get();
        
        UserAccountModule.isValid
                         .apply(context.apply(user1),
                                user1)
                         .get();

Let's take a look at the events that are published during the execution of the previous code:

[
  {
    "event": "MESSAGE_SENT",
    "to": "isValid",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "message": {
      "email": "[email protected]",
      "age": 10,
      "id": "03486761"
    },
    "instant": "2020-10-11T15:09:26.704145Z",
    "thread": "main"
  },
  {
    "event": "MESSAGE_RECEIVED",
    "address": "isValid",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "instant": "2020-10-11T15:09:26.708157Z",
    "thread": "vert.x-eventloop-thread-8"
  },
  {
    "event": "MESSAGE_SENT",
    "to": "isValid",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "message": {
      "email": "[email protected]",
      "age": 17,
      "id": "03786761>"
    },
    "instant": "2020-10-11T15:09:26.708597Z",
    "thread": "main"
  },
  {
    "event": "MESSAGE_SENT",
    "to": "isLegalAge",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "message": 10,
    "instant": "2020-10-11T15:09:26.709568Z",
    "thread": "vert.x-eventloop-thread-8"
  },
  {
    "event": "MESSAGE_RECEIVED",
    "address": "isLegalAge",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "instant": "2020-10-11T15:09:26.710185Z",
    "thread": "vert.x-eventloop-thread-4"
  },
  {
    "event": "MESSAGE_SENT",
    "to": "isValidId",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "message": "03486761",
    "instant": "2020-10-11T15:09:26.710136Z",
    "thread": "vert.x-eventloop-thread-8"
  },
  {
    "event": "MESSAGE_SENT",
    "to": "isValidEmail",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "message": "[email protected]",
    "instant": "2020-10-11T15:09:26.710672Z",
    "thread": "vert.x-eventloop-thread-8"
  },
  {
    "event": "MESSAGE_RECEIVED",
    "address": "isValidId",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "instant": "2020-10-11T15:09:26.710713Z",
    "thread": "vert.x-eventloop-thread-5"
  },
  {
    "event": "MESSAGE_RECEIVED",
    "address": "isValidEmail",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "instant": "2020-10-11T15:09:26.711165Z",
    "thread": "vert.x-eventloop-thread-6"
  },
  {
    "event": "MESSAGE_RECEIVED",
    "address": "isValid",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "instant": "2020-10-11T15:09:26.711854Z",
    "thread": "vert.x-eventloop-thread-8"
  }
  {
    "event": "MESSAGE_SENT",
    "to": "isLegalAge",
    "context": {
      "email": [
        "[email protected]"
      ]
    },
    "message": 17,
    "instant": "2020-10-11T15:09:26.712138Z",
    "thread": "vert.x-eventloop-thread-8"
  }
]

Spawning verticles

With vertx-effect, you can spawn verticles, which means that verticles are deployed and undeployed on the fly. Every time something needs to be computed, a new verticle is deployed. When the computation is done and the verticle replies, it is undeployed right away. The goal is to get the most out of the cores! Erlang taught us how to develop concurrent software that doubles in speed if you double the number of cores without changing a code line: spawning as many verticles as possible. In Erlang jargon, a verticle is kind of a process.

Will deploy and undeploy verticles continuously slow down the system? It depends, like everything related to performance. There are times when the cost of reaching a greater level of parallelization is worth it. Other times it's not. Let's see how long it takes to deploy and undeploy one million verticles:


@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void deploy_undeploy()throws InterruptedException{
        
        int processes = 1000000;
        CountDownLatch latch = new CountDownLatch(processes);

        for(int i=0; i<processes; i++) {
          vertxRef.deploy("id"+i,
                           Lambda.<JsObj>identity()
                         )
                  .onComplete( vr -> {
                                    vr.result()
                                      .undeploy()
                                      .onSuccess(it->latch.countDown());
                                     }
                             )
                  .get();
        }

        latch.await(10,SECONDS);

        }

It takes almost three seconds, 3 microseconds per verticle:

Benchmark                  Mode  Cnt  Score   Error  Units
Processes.deploy_undeploy  avgt   10  2.907 ± 0.658   s/op

Http Client

Here's a comprehensive example demonstrating the effortless creation of an HTTP server, deployment of a handler using stubs, creation of an HTTP client module, and the sending of both GET and POST requests. Requesting data is simplified to the extent that it merely involves invoking lambdas that return HTTP responses encapsulated in JsObj.

@ExtendWith(VertxExtension.class)
public class HttpClientMethodsTests {

    private static final int PORT = Port.number.incrementAndGet();
    static HttpClientModule httpClient;

    @BeforeAll
    public static void prepare(final Vertx vertx,
                               final VertxTestContext context
                              ) {
        VertxRef vertxRef = new VertxRef(vertx);
        vertxRef.registerConsumer(VertxRef.EVENTS_ADDRESS,
                                  System.out::println
                                 );
        httpClient = new HttpClientModule(new HttpClientOptions().setDefaultHost("0.0.0.0"),
                                          "myhttp-client");

        HttpRespStub mockReqResp =
                HttpRespStub.when(ALWAYS)
                            .setBodyResp(n -> body -> req -> JsObj.of("req_method",
                                                                      JsStr.of(req.method()
                                                                                  .name()
                                                                              ),
                                                                      "req_body",
                                                                      JsStr.of(body.toString()),
                                                                      "req_uri",
                                                                      JsStr.of(req.uri())
                                                                     )
                                                                  .toPrettyString()
                                        )
                            .setHeadersResp(HttpHeadersRespStub.JSON_CONTENT_TYPE);

        MapExp.seq("json-values-codecs",
                   vertxRef.deployVerticle(new RegisterJsValuesCodecs()),
                   "http-server",
                   new HttpServerBuilder(vertx,
                                         new HttpReqHandlerStub(mockReqResp)
                   ).create(PORT),
                   "http-client",
                   vertxRef.deployVerticle(httpClient)
                  )
              .get()
              .onComplete(Verifiers.pipeTo(context));


    }


    @Test
    public void testGet(VertxTestContext context) {
        VIO<JsObj> getReq = httpClient.get.apply(HttpHeaders.headers()
                                                            .set("method",
                                                                 "get"
                                                                ),
                                                 new GetReq().port(PORT)
                                                             .uri("example")
                                                );
        Verifiers.<JsObj>verifySuccess(resp -> {
                     int status = HttpResp.STATUS_CODE_LENS.get.apply(resp);
                     String bodyResp = HttpResp.STR_BODY_LENS.get.apply(resp);
                     JsObj bodyJsObj = JsObj.parse(bodyResp);
                     return bodyJsObj.getStr("req_method")
                                     .equals("GET")
                            && bodyJsObj.getStr("req_uri")
                                        .equals("example")
                            && bodyJsObj.getStr("req_body").isEmpty()
                            && status == 200;
                 })
                 .accept(getReq,
                         context
                        );

    }

    @Test
    public void testPost(VertxTestContext context) {
        VIO<JsObj> postReq = httpClient.post.apply(HttpHeaders.headers()
                                                              .set("method",
                                                                   "post"
                                                                  ),
                                                   new PostReq("hi".getBytes())
                                                           .port(PORT)
                                                           .uri("example")
                                                  );
        Verifiers.<JsObj>verifySuccess(resp -> {
                     Integer status = HttpResp.STATUS_CODE_LENS.get.apply(resp);
                     String bodyResp = HttpResp.STR_BODY_LENS.get.apply(resp);
                     JsObj bodyJsObj = JsObj.parse(bodyResp);
                     return bodyJsObj.getStr("req_method")
                                     .equals("POST")
                            && bodyJsObj.getStr("req_uri")
                                        .equals("example")
                            && bodyJsObj.getStr("req_body")
                                        .equals("hi")
                            && status == 200;
                 })
                 .accept(postReq,
                         context
                        );
    }


}

You can harness the power of the VIO API to make requests with retry policies, as illustrated in the following example:

@ExtendWith(VertxExtension.class)
public class HttpClientTestRetryOnFailure {

    private static final int PORT = Port.number.incrementAndGet();

    static HttpClientModule httpClient;
    static VertxRef vertxRef;
    static HttpReqHandlerStub httpReqHandlerStub;

    @BeforeAll
    public static void prepare(final Vertx vertx,
                               final VertxTestContext context
                              ) {
        vertxRef = new VertxRef(vertx);

        vertxRef.registerConsumer(VertxRef.EVENTS_ADDRESS,
                                  System.out::println
                                 );
        httpClient = new HttpClientModule(new HttpClientOptions().setDefaultHost("0.0.0.0"),
                                          "myhttp-client");

        HttpRespStub mockReqErrorResp =
                HttpRespStub.when((n, req) -> n <= 3)
                            .setStatusCodeResp(n -> body -> req -> 500)
                            .setBodyResp(n -> body -> req -> "{}")
                            .setHeadersResp(HttpHeadersRespStub.JSON_CONTENT_TYPE);

        HttpRespStub mockReqErrorSuccess =
                HttpRespStub.when((n, req) -> n > 3)
                            .setStatusCodeResp(n -> body -> req -> 200)
                            .setBodyResp(n -> body -> req -> "{}")
                            .setHeadersResp(HttpHeadersRespStub.JSON_CONTENT_TYPE);

        httpReqHandlerStub = new HttpReqHandlerStub(mockReqErrorResp,
                                                    mockReqErrorSuccess
        );
        TripleExp.seq(vertxRef.deployVerticle(new RegisterJsValuesCodecs()),
                      new HttpServerBuilder(vertx,
                                            httpReqHandlerStub
                      ).create(PORT),
                      vertxRef.deployVerticle(httpClient)
                     )
                 .get()
                 .onComplete(Verifiers.pipeTo(context));
    }


    @Test
    public void test_retries(VertxTestContext context) {
        Verifiers.<JsObj>verifySuccess(resp -> {
                     Integer status = HttpResp.STATUS_CODE_LENS.get.apply(resp);
                     return status == 200;
                 })
                 .accept(httpClient.get.apply(new GetReq().port(PORT)
                                                          .uri("example")
                                             )
                                       .repeat(resp -> HttpResp.STATUS_CODE_LENS.get.apply(resp) == 500,
                                               limitRetries(3)
                                              ),
                         context
                        );

    }


    @Test
    public void test_retries_constant_delay(VertxTestContext context) {
        httpReqHandlerStub.resetCounter();

        long tic = Instant.now()
                          .toEpochMilli();

        VIO<JsObj> getReq =
                httpClient.get.apply(new GetReq().port(PORT)
                                                 .uri("example")
                                    )
                              .repeat(resp -> HttpResp.STATUS_CODE_LENS.get.apply(resp) == 500,
                                      limitRetries(3).append(constantDelay(vertxRef.delay(Duration.ofMillis(100))))
                                     );

        Verifiers.<JsObj>verifySuccess(resp -> {
            long elapsed = Instant.now()
                                  .toEpochMilli() - tic;
            int status = HttpResp.STATUS_CODE_LENS.get.apply(resp);
            System.out.println(elapsed);
            return status == 200 && elapsed >= 300;
        }).accept(getReq,
                  context
                 );
    }

}

Oauth Http client

Creating an HTTP client with OAuth client credentials support is made exceptionally straightforward, liberating you from the intricacies of obtaining and refreshing tokens. The provided code showcases how to easily set up a resilient HTTP client using the VIO API, complete with token retrieval and automatic refresh. This not only streamlines the process but also enables you to seamlessly integrate retry policies and other reliability features.

@ExtendWith(VertxExtension.class)
public class ClientCredentialsModuleTest {

    static ClientCredentialsModule httpClient;
    static ClientCredentialsModuleBuilder builder;
    static VertxRef vertxRef;
    static int port = Port.number.incrementAndGet();


    @BeforeAll
    public static void prepare(final Vertx vertx,
                               final VertxTestContext context
                              ) {
        builder =
                new ClientCredentialsModuleBuilder(new HttpClientOptions().setDefaultPort(port)
                                                                          .setDefaultHost("0.0.0.0"),
                                                   "my-httpclient",
                                                   new AccessTokenRequest("client_id",
                                                                          "client_secret"
                                                   ));

        httpClient = builder.createModule();

        vertxRef = new VertxRef(vertx);

        vertxRef.registerConsumer(VertxRef.EVENTS_ADDRESS,
                                  System.out::println
                                 );

        PairExp.seq(vertxRef.deployVerticle(new RegisterJsValuesCodecs()),
                    vertxRef.deployVerticle(httpClient)
                   )
               .onComplete(Verifiers.pipeTo(context))
               .get();

    }


    @Test
    public void test_get_success_after_three_retries_getting_token(Vertx vertx,
                                                                   VertxTestContext context
                                                                  ) {


        builder.setAccessTokenReqRetryPolicy(e -> true,
                                             RetryPolicies.limitRetries(3)
                                            );

        List<HttpRespStub> httpRespStubs =
                List.of(when(REQ_LET.apply(3))
                                .setBodyResp(HttpBodyRespStub.cons(JsObj.of("token_found",
                                                                            FALSE
                                                                           )
                                                                  )
                                            )
                                .setStatusCodeResp(HttpStatusCodeRespStub._401)
                                .setHeadersResp(HttpHeadersRespStub.JSON_CONTENT_TYPE),
                        when(FORTH_REQ)
                                .setBodyResp(HttpBodyRespStub.cons(JsObj.of("token_found",
                                                                            TRUE,
                                                                            "access_token",
                                                                            JsStr.of("foooo")
                                                                           )
                                                                  )
                                            )
                                .setStatusCodeResp(HttpStatusCodeRespStub._200)
                                .setHeadersResp(HttpHeadersRespStub.JSON_CONTENT_TYPE),
                        when(REQ_GT.apply(4))
                                .setBodyResp(HttpBodyRespStub.cons(JsObj.of("name",
                                                                            JsStr.of("Rafael")
                                                                           ))
                                            )
                                .setStatusCodeResp(HttpStatusCodeRespStub._200)
                                .setHeadersResp(HttpHeadersRespStub.JSON_CONTENT_TYPE));

        VIO<JsObj> getReq = httpClient.getOauth.apply(new GetReq().uri("/name"));

        new HttpServerBuilder(vertx,
                              new HttpReqHandlerStub(httpRespStubs

                              )
        ).create(port)
         .get()
         .onSuccess(server -> {
             Verifiers.<JsObj>verifySuccess(resp -> HttpResp.STATUS_CODE_LENS.get.apply(resp) == 200)
                      .accept(getReq,
                              context
                             );
         });

    }

  @Test
  public void test_get_success_after_three_retries(Vertx vertx,
                                                   VertxTestContext context
                                                  ) {


    List<HttpRespStub> httpRespStubs =
            List.of(when(REQ_LET.apply(3))
                            .setBodyResp(HttpBodyRespStub.cons(JsObj.of("token_found",
                                                                        FALSE
                                                                       )
                                                              )
                                        )
                            .setStatusCodeResp(HttpStatusCodeRespStub._401),
                    when(FORTH_REQ)
                            .setBodyResp(HttpBodyRespStub.cons(JsObj.of("token_found",
                                                                        TRUE,
                                                                        "access_token",
                                                                        JsStr.of("foooo")
                                                                       )
                                                              )
                                        )
                            .setStatusCodeResp(HttpStatusCodeRespStub._200),
                    when(REQ_LET.apply(7))
                            .setBodyResp(c -> body -> req -> {
                                           req.response().close();
                                           return "{}";
                                         }
                                        )
                            .setStatusCodeResp(HttpStatusCodeRespStub._500),
                    when(REQ_GT.apply(7))
                            .setBodyResp(HttpBodyRespStub.cons(JsObj.of("name",
                                                                        JsStr.of("Rafael")
                                                                       ))
                                        )
                            .setStatusCodeResp(HttpStatusCodeRespStub._200));

    VIO<JsObj> getReq = httpClient.getOauth.apply(new GetReq().uri("/name"))
                                           .retry(RetryPolicies.limitRetries(3));

    new HttpServerBuilder(vertx,
                          new HttpReqHandlerStub(httpRespStubs)
    ).create(port)
     .get()
     .onSuccess(server -> {
       Verifiers.<JsObj>verifySuccess(resp -> HttpResp.STATUS_CODE_LENS.get.apply(resp) == 200)
                .accept(getReq,
                        context
                       );
     });


  }
}

Http server

TO be documented but implemented!

Testing

VIO stubs

TO be documented but implemented!

Http server stubs

TO be documented but implemented!

Requirements

Java 17 or greater

Installation

<dependency>
    <groupId>com.github.imrafaelmerino</groupId>
    <artifactId>vertx-effect</artifactId>
    <version>4.0.0</version>
</dependency>

Java 21 or greater

Installation

<dependency>
    <groupId>com.github.imrafaelmerino</groupId>
    <artifactId>vertx-effect</artifactId>
    <version>5.0.0</version>
</dependency>

Related projects

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages