Skip to content

EventBus

Frieder Reinhold edited this page Jan 17, 2024 · 5 revisions

EventBus

When to use?

Correo is an application for event systems, therefore it makes sense, that something similar is used inside to distribute information. While this is very helpful in some situations, it is also a tool that must not be overused. If you only need to listen to some callbacks of a task for example inside your view, please use the future-like interface of tasks instead of the EventBus. So when does it make sense to use the EventBus? In cases where different parts of the application needs to be informed. A good example for this is the connection lifecycle. Basically everything in Correo is related to connections, so events about this topic are a good use case for the EventBus.

Concept of EventBus

Basically the EventBus provides a pub/sub or observer pattern in Java. If you are familiar with Jakarta EE, it works a little bit like that: Jakarta EE Event Specification.

So there are events that can be fired and listener that can be registered and be triggered on events. It is possible to fire events asynchronous (in contrast to Jakarta EE it is not possible to listen asynchronous currently). It is also possible to filter events by comparing a defined identifier of the event with a defined identifier from the listener.

Register and unregister a listener

In order to register a class as listener the class must be given to the EventBus. Usually that should be done in the constructor of your class. In order to avoid listener leak one must ensure that the class is removed from the EventBus after the class is not relevant anymore. If you won't do that, the EventBus will keep a reference to your class and the GC will never come around. Usually Classes in Correo do have a cleanup function that is called by a superior class or an event like a window close request.

import org.correomqtt.business.eventbus.EventBus;

public class SomeClazz {
    public SomeClazz() {
        EventBus.register(this);
    }

    public void cleanup(){
        EventBus.unregister(this);
    }
}

It is not necessary to implement an interface or something like that. But also without subscribing to events nothing will happen if your class is registered only.

Subscribing to Events

In order to get triggered if a specific event is fired you have to use the annotation @Subscribe. This annotation can be used on method or signature level and points to the event class (or classes) that should be listened for. But it can not be used on both for the same method of course. Further the event itself is passed as parameter to the listen method. This is only possible if the method does listen to exactly one event.

Example #1

In this case the method is called if the PublishEvent is fired. The event itself is not be used.

    @Subscribe(PublishEvent.class)
    public void onMessagePublished(){

    }

Example #2

Here the method is called with the event parameter.

    public void onMessagePublished(@Subscribe PublishEvent event){
        String topic = event.getMessageDTO().getTopic();
        // do something
    }

As alternative it is also possible to reach the same like this:

    @Subscribe(PublishEvent.class)
    public void onMessagePublished(PublishEvent event){
        String topic = event.getMessageDTO().getTopic();
        // do something
    }

Example 3

In this example we listen to multiple events at once. As each event is different, it is not possible to get the event object (if you need, you have to implement a method for each event)

    @Subscribe({
            ScriptExecutionCancelledEvent.class,
            ScriptExecutionSuccessEvent.class,
            ScriptExecutionProgressEvent.class,
            ScriptExecutionFailedEvent.class,
            ScriptExecutionsDeletedEvent.class
    })
    public void onScriptExecutionFinished() {
        // do something
    }

Firing Events

If you want to fire an event, you have to create an event class for that. There is one requirement only, your class must implement Event.

import org.correomqtt.business.eventbus.Event;

@AllArgsConstructor
@Getter
public class PublishEvent implements Event {
    private MessageDTO messageDTO;
}

In this case we use Lombok, but it is also possible to use a Java record.

Afterwards this event can be fired like this:

EventBus.fire(new PublishEvent(myMessageDTO));

This will send the event synchronous, which means that all registered listeners for this event will be called and processed before the method comes back.

If you want to fire an event asynchronous, this will work like this:

EventBus.fireAsync(new PublishEvent(myMessageDTO));

Filtering

In some cases it is required to receive an event of a specific type only if a specific condition is met. In that case the @SubscribeFilter annotation must be used.

The annotation must be parameterized with a filter name. If an event and and a listener that is listening to that event type do have the same filter name, the response of the filter methods are compared and only if they are equal, the event will be delivered. There are some predefined filter names in SubscribeFilterNames, but of course it is possible to use your own names.

At first we have to extend the event.

import org.correomqtt.business.eventbus.Event;
import org.correomqtt.business.eventbus.SubscribeFilter;
import org.correomqtt.business.eventbus.SubscribeFilterNames.CONNECTION_ID;

@AllArgsConstructor
@Getter
public class PublishEvent implements Event {
    private String connectionId;
    private MessageDTO messageDTO;

    @SubscribeFilter(CONNECTION_ID)
    public String getConnectionId(){
        return connectionId;
    }

}

And the listener class the same way.

import org.correomqtt.business.eventbus.EventBus;
import org.correomqtt.business.eventbus.Subscribe;
import org.correomqtt.business.eventbus.SubscribeFilter;
import org.correomqtt.business.eventbus.SubscribeFilterNames.CONNECTION_ID;

public class SomeClazz {

    private final String connectionId;

    public SomeClazz(String connectionId) {
        this.connectionId = connectionId;
        EventBus.register(this);
    }

    @Subscribe(PublishEvent.class)
    public void onMessagePublished(){
      // do something
    }

    @SubscribeFilter(CONNECTION_ID)
    public String getConnectionId() {
        return connectionId;
    }

    public void cleanup(){
        EventBus.unregister(this);
    }
}

As result if a PublishEvent is fired, only instances of SomeClazz are triggered that have the same connectionId as the event.

Important note.

Usually events are not fired in the FXApplicationThread rather from the background. Due to that asynchronous events will be automatically pushed to the FXApplicationThread. In that case there is no need to call Platform.runLater before doing UI stuff. On the other hand, as synchronous events are designed to wait for the listners finishing their work, those events do NOT push to the FXApplicationThread. Platform.runLater would introduce the same effect as asynchronous here.