Skip to content

Commit

Permalink
🐳 [EventBus] 将抽象类 AbstractEventBusRunner 标记为过时的,由接口 EventBusRunner 代替。
Browse files Browse the repository at this point in the history
[EventBus] 增加 EventBus 模块相关 javadoc
  • Loading branch information
iohao committed Jun 7, 2024
1 parent aff3b6a commit e8afd4a
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,74 +18,13 @@
*/
package com.iohao.game.action.skeleton.eventbus;

import com.iohao.game.action.skeleton.core.BarSkeleton;
import com.iohao.game.action.skeleton.core.SkeletonAttr;
import com.iohao.game.action.skeleton.core.commumication.BrokerClientContext;
import com.iohao.game.action.skeleton.core.runner.Runner;
import com.iohao.game.action.skeleton.protocol.processor.SimpleServerInfo;

import java.util.Set;

/**
* 分布式事件总线 Runner
*
* @author 渔民小镇
* @date 2023-12-24
* @deprecated 请使用 {@link EventBusRunner} 代替
*/
public abstract class AbstractEventBusRunner implements Runner {
@Override
public void onStart(BarSkeleton skeleton) {
// BrokerClient,当前逻辑服引用
BrokerClientContext brokerClientContext = skeleton.option(SkeletonAttr.brokerClientContext);
String brokerClientId = brokerClientContext.getId();

EventBrokerClientMessage eventBrokerClientMessage = getEventBrokerClientMessage(brokerClientContext);

// EventBus 是逻辑服事件总线。 EventBus、业务框架、逻辑服三者是 1:1:1 的关系。
EventBus eventBus = new EventBus(brokerClientId);
skeleton.option(SkeletonAttr.eventBus, eventBus);

// EventBus 默认设置
eventBus.setSubscribeExecutorStrategy(SubscribeExecutorStrategy.defaultInstance());
eventBus.setSubscriberInvokeCreator(SubscriberInvokeCreator.defaultInstance());
eventBus.setEventBusMessageCreator(EventBusMessageCreator.defaultInstance());
eventBus.setEventBusListener(EventBusListener.defaultInstance());
eventBus.setExecutorRegion(skeleton.getExecutorRegion());

eventBus.setBrokerClientContext(brokerClientContext);
eventBus.setEventBrokerClientMessage(eventBrokerClientMessage);

// EventBus 注册订阅者
this.registerEventBus(eventBus, skeleton);

Set<String> topic = eventBus.listTopic();
eventBrokerClientMessage.setEventTopicMessage(new EventTopicMessage(topic));

eventBus.setStatus(EventBus.EventBusStatus.run);

EventBusRegion.addLocal(eventBus);
}

private EventBrokerClientMessage getEventBrokerClientMessage(BrokerClientContext brokerClientContext) {
SimpleServerInfo simpleServerInfo = brokerClientContext.getSimpleServerInfo();
String id = simpleServerInfo.getId();
String appName = simpleServerInfo.getName();
String tag = simpleServerInfo.getTag();
String typeName = simpleServerInfo.getBrokerClientType();

return new EventBrokerClientMessage(appName, tag, typeName, id);
}

/**
* 可在此方法中注册订阅者
* example
* <pre>{@code
* eventBus.register(new YourEventBusSubscriber());
* }
* </pre>
*
* @param eventBus EventBus
* @param skeleton 业务框架
*/
abstract protected void registerEventBus(EventBus eventBus, BarSkeleton skeleton);
}
@Deprecated
public abstract class AbstractEventBusRunner implements EventBusRunner {
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public final class EventBus {
public void register(Object eventBusSubscriber) {

if (status != EventBusStatus.register) {
throw new RuntimeException("运行中不允许注册订阅者,请在 AbstractEventRunner.registerEventBus 方法中注册。 ");
throw new RuntimeException("运行中不允许注册订阅者,请在 EventRunner.registerEventBus 方法中注册。 ");
}

// 注册
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class EventBusLocalRegion {
* value : 订阅者集合。(当前进程内的所有订阅者)
* </pre>
*/
final ListMultiMap<Class<?>, Subscriber> subscriberListMap = ListMultiMap.create();
final ListMultiMap<Class<?>, Subscriber> subscriberListMap = ListMultiMap.of();

public EventBus getEventBus(String brokerClientId) {
return eventBusMap.get(brokerClientId);
Expand Down Expand Up @@ -80,7 +80,7 @@ void addLocal(EventBus eventBus) {

private void resetLocalSubscriber() {

ListMultiMap<Class<?>, Subscriber> tempMultiMap = ListMultiMap.create();
ListMultiMap<Class<?>, Subscriber> tempMultiMap = ListMultiMap.of();
for (EventBus eventBus : eventBusMap.values()) {

SubscriberRegistry subscriberRegistry = eventBus.subscriberRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class EventBusRemoteRegion {
* value : across progress id
* </pre>
*/
final SetMultiMap<String, EventBrokerClientMessage> remoteTopicMultiMap = SetMultiMap.create();
final SetMultiMap<String, EventBrokerClientMessage> remoteTopicMultiMap = SetMultiMap.of();

/**
* 其他进程逻辑服的信息
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* ioGame
* Copyright (C) 2021 - present 渔民小镇 ([email protected][email protected]) . All Rights Reserved.
* # iohao.com . 渔民小镇
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.iohao.game.action.skeleton.eventbus;

import com.iohao.game.action.skeleton.core.BarSkeleton;
import com.iohao.game.action.skeleton.core.SkeletonAttr;
import com.iohao.game.action.skeleton.core.commumication.BrokerClientContext;
import com.iohao.game.action.skeleton.core.runner.Runner;
import com.iohao.game.action.skeleton.protocol.processor.SimpleServerInfo;

import java.util.Set;

/**
* 分布式事件总线 Runner
*
* @author 渔民小镇
* @date 2024-06-06
* @since 21.10
*/
public interface EventBusRunner extends Runner {
@Override
default void onStart(BarSkeleton skeleton) {
// BrokerClient,当前逻辑服引用
BrokerClientContext brokerClientContext = skeleton.option(SkeletonAttr.brokerClientContext);
String brokerClientId = brokerClientContext.getId();

EventBrokerClientMessage eventBrokerClientMessage = getEventBrokerClientMessage(brokerClientContext);

// EventBus 是逻辑服事件总线。 EventBus、业务框架、逻辑服三者是 1:1:1 的关系。
EventBus eventBus = new EventBus(brokerClientId);
skeleton.option(SkeletonAttr.eventBus, eventBus);

// EventBus 默认设置
eventBus.setSubscribeExecutorStrategy(SubscribeExecutorStrategy.defaultInstance());
eventBus.setSubscriberInvokeCreator(SubscriberInvokeCreator.defaultInstance());
eventBus.setEventBusMessageCreator(EventBusMessageCreator.defaultInstance());
eventBus.setEventBusListener(EventBusListener.defaultInstance());
eventBus.setExecutorRegion(skeleton.getExecutorRegion());

eventBus.setBrokerClientContext(brokerClientContext);
eventBus.setEventBrokerClientMessage(eventBrokerClientMessage);

// EventBus 注册订阅者
this.registerEventBus(eventBus, skeleton);

Set<String> topic = eventBus.listTopic();
eventBrokerClientMessage.setEventTopicMessage(new EventTopicMessage(topic));

eventBus.setStatus(EventBus.EventBusStatus.run);

EventBusRegion.addLocal(eventBus);
}

private EventBrokerClientMessage getEventBrokerClientMessage(BrokerClientContext brokerClientContext) {
SimpleServerInfo simpleServerInfo = brokerClientContext.getSimpleServerInfo();
String id = simpleServerInfo.getId();
String appName = simpleServerInfo.getName();
String tag = simpleServerInfo.getTag();
String typeName = simpleServerInfo.getBrokerClientType();

return new EventBrokerClientMessage(appName, tag, typeName, id);
}

/**
* 可在此方法中注册订阅者
* example
* <pre>{@code
* eventBus.register(new YourEventBusSubscriber());
* }
* </pre>
*
* @param eventBus EventBus
* @param skeleton 业务框架
*/
void registerEventBus(EventBus eventBus, BarSkeleton skeleton);
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void initRemoteMessages() {

@FieldDefaults(level = AccessLevel.PRIVATE)
final class AnyTagView {
SetMultiMap<String, AnyTagBrokerClient> topicMultiMap = SetMultiMap.create();
SetMultiMap<String, AnyTagBrokerClient> topicMultiMap = SetMultiMap.of();

AnyTagViewData getAnyTagData(EventBusMessage message) {
String topic = message.getTopic();
Expand All @@ -182,7 +182,7 @@ AnyTagViewData getAnyTagData(EventBusMessage message) {

void reload(Collection<AnyTagBrokerClient> values) {
// 重新加载
SetMultiMap<String, AnyTagBrokerClient> tempMultiMap = SetMultiMap.create();
SetMultiMap<String, AnyTagBrokerClient> tempMultiMap = SetMultiMap.of();

for (AnyTagBrokerClient anyTagBrokerClient : values) {
anyTagBrokerClient.streamEventBrokerClientMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,10 @@ public void invoke(EventBusMessage eventBusMessage) {
}

void invoke(Object param) {
MethodAccess methodAccess = this.subscriber.getMethodAccess();
Object target = this.subscriber.getTarget();
int methodIndex = this.subscriber.getMethodIndex();

// 调用开发者在 action 类中编写的业务方法,即 action
MethodAccess methodAccess = this.subscriber.getMethodAccess();
methodAccess.invoke(target, methodIndex, param);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@FieldDefaults(level = AccessLevel.PACKAGE)
final class SubscriberRegistry {
static final AtomicLong subscriberId = new AtomicLong();
final ListMultiMap<Class<?>, Subscriber> subscriberMultiMap = ListMultiMap.create();
final ListMultiMap<Class<?>, Subscriber> subscriberMultiMap = ListMultiMap.of();
final Set<Class<?>> eventBusSubscriberSet = new NonBlockingHashSet<>();

void register(Object eventBusSubscriber, SubscriberInvokeCreator subscriberInvokeCreator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* ioGame
* Copyright (C) 2021 - present 渔民小镇 ([email protected][email protected]) . All Rights Reserved.
* # iohao.com . 渔民小镇
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
/**
* <a href="https://www.yuque.com/iohao/game/gmxz33">分布式事件总线相关文档</a>,分布式事件总线与 Guava EventBus、Redis 发布订阅、MQ 等产品类似。
* <pre>
* 如果使用 Redis、MQ ...等中间件,需要开发者额外的安装这些中间件,并支付所占用机器的费用;使用 Guava EventBus 则只能在当前进程中通信,无法实现跨进程。
* 而 ioGame 提供的分布式事件总线,拥有上述两者的优点。此外,还可以有效的帮助企业节省云上 Redis、 MQ 这部分的支出。
* 事件发布后,除了当前进程所有的订阅者能接收到,远程的订阅者也能接收到(支持跨机器、跨进程、跨不同类型的多个逻辑服)。可以代替 redis pub sub 、 MQ ,并且具备全链路调用日志跟踪,这点是中间件产品做不到的。
* </pre>
* <pre>
* ioGame 分布式事件总线,特点:
* 使用方式与 Guava EventBus 类似
* 具备全链路调用日志跟踪。(这点是中间件产品做不到的)
* 支持跨多个机器、多个进程通信
* 支持与多种不同类型的多个逻辑服通信
* 纯 javaSE,不依赖其他服务,耦合性低。(不需要安装任何中间件)
* 事件源和事件监听器之间通过事件进行通信,从而实现了模块之间的解耦
* 当没有任何远程订阅者时,将不会触发网络请求。(这点是中间件产品做不到的)
* </pre>
* <pre>
* ioGame 提供的分布式事件总线在使用上是简单的,只有 3 个概念,分别是:
* 1. 事件源:事件源由开发者定义。
* 2. 订阅者:订阅者由开发者定义。
* 3. 发布事件
* </pre>
* <p>
* for example,下面示例展示了分布式事件总线的开启、注册订阅者、定义事件源、发布事件
* <pre>{@code
* // 通过业务框架的 addRunner 方法来扩展分布式事件总线相关内容 (Runner 扩展机制),我们将 UserLoginEventMessage、EmailEventBusSubscriber 注册到 EventBus 中。
* public class MyLogicStartup extends AbstractBrokerClientStartup {
* ... ...省略部分代码
*
* @Override
* public BarSkeleton createBarSkeleton() {
* // 业务框架构建器
* var builder = ...
*
* // 开启分布式事件总线。逻辑服添加 EventBusRunner,用于处理 EventBus 相关业务
* builder.addRunner(new EventBusRunner() {
* @Override
* public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
* // 注册订阅者
* eventBus.register(new EmailEventBusSubscriber());
* eventBus.register(new UserLoginEventMessage());
* }
* });
*
* return builder.build();
* }
* }
*
* // 事件源由开发者定义。事件源是业务数据载体等,其主要目的是用于业务数据的传输。
* @Data
* public class UserLoginEventMessage implements Serializable {
* final long userId;
*
* public UserLoginEventMessage(long userId) {
* this.userId = userId;
* }
* }
*
* // 订阅者由开发者定义。
* // 我们在 EmailEventBusSubscriber、UserEventBusSubscriber 类中,分别提供了 UserLoginEventMessage 事件源的订阅者 mail、userLogin。
* // 当有 UserLoginEventMessage 相关的事件触发,订阅者都能接收到事件。别忘记,当前介绍的是分布式事件总线;所以,即使订阅者在不同的进程中,也能接收到事件。
* // 另外,值得称赞的是,如果没有任何远程订阅者,将不会触发网络请求。简单的说,事件发布后,当其他进程(其他机器)没有相关订阅者时,只会在内存中传递事件给当前进程的相关订阅者。所以,可以将该通讯方式当作 guava EventBus 来使用。
* @EventBusSubscriber
* public class EmailEventBusSubscriber {
* @EventSubscribe
* public void mail(UserLoginEventMessage message) {
* long userId = message.getUserId();
* log.info("event - 玩家[{}]登录,发放 email 奖励", userId);
* }
* }
*
* @EventBusSubscriber
* public class UserEventBusSubscriber {
* @EventSubscribe
* public void userLogin(UserLoginEventMessage message) {
* log.info("event - 玩家[{}]登录,记录登录时间", message.getUserId());
* }
* }
*
* // 发布事件
* @ActionController(UserCmd.cmd)
* public class UserAction {
* @ActionMethod(UserCmd.fireEvent)
* public void fireEventUser(FlowContext flowContext) {
* long userId = flowContext.getUserId();
* // 事件源
* var message = new UserLoginEventMessage(userId);
* // 发布事件
* flowContext.fire(message);
* // 事件发布后,会被 UserEventBusSubscriber、EmailEventBusSubscriber 接收。
* }
* }
*
* }</pre>
*
* @author 渔民小镇
* @date 2024-06-06
* @see com.iohao.game.action.skeleton.core.runner.Runner
*/
package com.iohao.game.action.skeleton.eventbus;
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public void fireAny() {

sleep();
Assert.assertEquals(6, CustomEvent.myMessageLong.sum());


}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,4 @@
@FieldDefaults(level = AccessLevel.PRIVATE)
public class MyMessage {
String name;


}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public class TaskKit {
/** 虚拟线程执行器 */
@Getter
final ExecutorService virtualExecutor = ExecutorKit.newVirtualExecutor("ioGameVirtual-");
final SetMultiMap<TickTimeUnit, IntervalTaskListener> intervalTaskListenerMap = SetMultiMap.create();
final SetMultiMap<TickTimeUnit, IntervalTaskListener> intervalTaskListenerMap = SetMultiMap.of();

record TickTimeUnit(long tick, TimeUnit timeUnit) {
}
Expand Down

0 comments on commit e8afd4a

Please sign in to comment.