Skip to content

Commit b7f0b49

Browse files
committed
Support reactive-styles plugin extension lifecycle hooks to support flexible implementation and improve performance
1 parent 1853430 commit b7f0b49

File tree

15 files changed

+458
-308
lines changed

15 files changed

+458
-308
lines changed

turms-plugins/turms-plugin-antispam/src/main/java/im/turms/plugin/antispam/AntiSpamHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public AntiSpamHandler(AntiSpamProperties properties) {
8181
}
8282

8383
@Override
84-
protected void onStarted() {
84+
protected Mono<Void> onStarted() {
8585
AntiSpamProperties properties = loadProperties(AntiSpamProperties.class);
8686
enabled = properties.isEnabled();
8787
textPreprocessor = new TextPreprocessor(properties.getTextParsingStrategy());
@@ -95,6 +95,7 @@ protected void onStarted() {
9595
: null;
9696
textTypeToProperties = createTextTypeToPropertiesMap(properties.getTextTypes(),
9797
properties.getSilentIllegalTextTypes());
98+
return Mono.empty();
9899
}
99100

100101
private Map<TurmsRequest.KindCase, TextTypeProperties> createTextTypeToPropertiesMap(

turms-plugins/turms-plugin-minio/src/main/java/im/turms/plugin/minio/MinioStorageServiceProvider.java

+43-48
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.context.ApplicationContext;
4949
import reactor.core.publisher.Flux;
5050
import reactor.core.publisher.Mono;
51+
import reactor.util.retry.Retry;
5152

5253
import im.turms.plugin.minio.core.BucketPolicy;
5354
import im.turms.plugin.minio.core.BucketPolicyAction;
@@ -85,6 +86,8 @@
8586
import im.turms.server.common.infra.security.MacUtil;
8687
import im.turms.server.common.infra.time.DateRange;
8788
import im.turms.server.common.infra.time.DurationConst;
89+
import im.turms.server.common.infra.tracing.TracingCloseableContext;
90+
import im.turms.server.common.infra.tracing.TracingContext;
8891
import im.turms.server.common.storage.mongo.TurmsMongoClient;
8992
import im.turms.service.domain.group.service.GroupMemberService;
9093
import im.turms.service.domain.storage.bo.StorageResourceInfo;
@@ -122,8 +125,10 @@ public class MinioStorageServiceProvider extends TurmsExtension implements Stora
122125
* @implNote 1. We use HMAC(key, message) instead of a HASH(key + message) to avoid the length
123126
* extension attack. To put simply, if a hacker knows the signature of the resource
124127
* "1", and he can also know the signature of resource "12", "13", "123", and so on
125-
* without knowledge of the key. 2. Use MD5 because its output size (128 bits) is
126-
* small, and it is a 22-character Base62-encoded string.
128+
* without knowledge of the key.
129+
* <p>
130+
* 2. Use MD5 because its output size (128 bits) is small, and it is a 22-character
131+
* Base62-encoded string.
127132
*/
128133
private boolean isMacEnabled;
129134
@Nullable
@@ -156,30 +161,30 @@ public class MinioStorageServiceProvider extends TurmsExtension implements Stora
156161
}
157162

158163
@Override
159-
public void onStarted() {
160-
setUp();
164+
public Mono<Void> onStarted() {
165+
return setUp();
161166
}
162167

163-
private void setUp() {
168+
private Mono<Void> setUp() {
164169
MinioStorageProperties properties = loadProperties(MinioStorageProperties.class);
165170
if (!properties.isEnabled()) {
166-
return;
171+
return Mono.empty();
167172
}
168173
String endpoint = properties.getEndpoint();
169174
URI uri;
170175
try {
171176
uri = new URI(endpoint);
172177
} catch (URISyntaxException e) {
173-
throw new IllegalArgumentException(
178+
return Mono.error(new IllegalArgumentException(
174179
"Illegal endpoint URL: "
175180
+ endpoint,
176-
e);
181+
e));
177182
}
178183
if (!uri.isAbsolute()) {
179-
throw new IllegalArgumentException(
184+
return Mono.error(new IllegalArgumentException(
180185
"The endpoint URL ("
181186
+ endpoint
182-
+ ") must be absolute");
187+
+ ") must be absolute"));
183188
}
184189
ApplicationContext context = getContext();
185190
node = context.getBean(Node.class);
@@ -217,15 +222,15 @@ private void setUp() {
217222
key = Base64.getDecoder()
218223
.decode(base64Key);
219224
} catch (Exception e) {
220-
throw new IllegalArgumentException(
225+
return Mono.error(new IllegalArgumentException(
221226
"The HMAC key must be Base64-encoded, but got: "
222227
+ base64Key,
223-
e);
228+
e));
224229
}
225230
if (key.length < 16) {
226-
throw new IllegalArgumentException(
231+
return Mono.error(new IllegalArgumentException(
227232
"The length of HMAC key must be greater than or equal to 16, but got: "
228-
+ key.length);
233+
+ key.length));
229234
}
230235
macKey = new SecretKeySpec(key, "HmacMD5");
231236
} else {
@@ -235,42 +240,32 @@ private void setUp() {
235240
properties.getRegion(),
236241
properties.getAccessKey(),
237242
properties.getSecretKey());
238-
Duration timeout = Duration.ofSeconds(INIT_BUCKETS_TIMEOUT_SECONDS);
239-
try {
240-
initBuckets().block(timeout);
241-
isServing = true;
242-
} catch (Exception e) {
243-
MinioStorageProperties.Retry retry = properties.getRetry();
244-
int maxAttempts = retry.getMaxAttempts();
245-
if (!retry.isEnabled() || maxAttempts <= 0) {
246-
throw new RuntimeException("Failed to initialize the MinIO client", e);
247-
}
248-
LOGGER.error("Failed to initialize the MinIO client. Retry times: 0", e);
249-
try {
250-
Thread.sleep(retry.getInitialIntervalMillis());
251-
} catch (InterruptedException ex) {
252-
throw new RuntimeException("Failed to initialize the MinIO client", e);
253-
}
254-
for (int currentRetryTimes = 1; currentRetryTimes <= maxAttempts; currentRetryTimes++) {
255-
try {
256-
initBuckets().block(timeout);
257-
} catch (Exception ex) {
258-
LOGGER.error("Failed to initialize the MinIO client. Retry times: "
259-
+ currentRetryTimes, ex);
260-
if (currentRetryTimes == maxAttempts) {
261-
throw new RuntimeException(
262-
"Failed to initialize the MinIO client with retries exhausted: "
263-
+ maxAttempts);
243+
244+
Mono<Void> initBuckets =
245+
initBuckets().timeout(Duration.ofSeconds(INIT_BUCKETS_TIMEOUT_SECONDS))
246+
.doOnSuccess(unused -> isServing = true);
247+
248+
MinioStorageProperties.Retry retry = properties.getRetry();
249+
int maxAttempts = retry.getMaxAttempts();
250+
if (!retry.isEnabled() || maxAttempts <= 0) {
251+
return initBuckets.onErrorMap(t -> true,
252+
t -> new RuntimeException("Failed to initialize the MinIO client", t));
253+
}
254+
return initBuckets.retryWhen(Retry.max(maxAttempts)
255+
.doBeforeRetryAsync(retrySignal -> Mono.deferContextual(contextView -> {
256+
long totalRetries = retrySignal.totalRetries();
257+
try (TracingCloseableContext ignored =
258+
TracingContext.getCloseableContext(contextView)) {
259+
LOGGER.error("Failed to initialize the MinIO client. Retry times: "
260+
+ totalRetries, retrySignal.failure());
264261
}
265-
try {
266-
Thread.sleep(retry.getIntervalMillis());
267-
} catch (InterruptedException ignored) {
268-
throw new RuntimeException("Failed to initialize the MinIO client", ex);
262+
if (0 == totalRetries) {
263+
return Mono.delay(Duration.ofMillis(retry.getInitialIntervalMillis()))
264+
.then();
269265
}
270-
}
271-
}
272-
isServing = true;
273-
}
266+
return Mono.delay(Duration.ofMillis(retry.getIntervalMillis()))
267+
.then();
268+
})));
274269
}
275270

276271
private void initClient(String endpoint, String region, String accessKey, String secretKey) {

turms-plugins/turms-plugin-push/src/main/java/im/turms/plugin/push/NotificationPusher.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import im.turms.server.common.infra.logging.core.logger.Logger;
3939
import im.turms.server.common.infra.logging.core.logger.LoggerFactory;
4040
import im.turms.server.common.infra.plugin.TurmsExtension;
41-
import im.turms.server.common.infra.time.DurationConst;
4241
import im.turms.server.common.infra.tracing.TracingCloseableContext;
4342
import im.turms.server.common.infra.tracing.TracingContext;
4443
import im.turms.service.access.servicerequest.dto.RequestHandlerResult;
@@ -59,19 +58,19 @@ public class NotificationPusher extends TurmsExtension implements RequestHandler
5958
private List<String> deviceTokenFieldNames;
6059

6160
@Override
62-
protected void onStarted() {
61+
protected Mono<Void> onStarted() {
6362
PushNotificationProperties properties = loadProperties(PushNotificationProperties.class);
6463
manager = new PushNotificationManager(properties);
6564
userService = getContext().getBean(UserService.class);
6665
userStatusService = getContext().getBean(UserStatusService.class);
6766

6867
deviceTokenFieldNames = manager.getDeviceTokenFieldNames();
68+
return Mono.empty();
6969
}
7070

7171
@Override
72-
protected void onStopped() {
73-
manager.close()
74-
.block(DurationConst.ONE_MINUTE);
72+
protected Mono<Void> onStopped() {
73+
return manager.close();
7574
}
7675

7776
@Override

turms-plugins/turms-plugin-rasa/src/main/java/im/turms/plugin/rasa/RasaResponser.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,19 @@ public class RasaResponser extends TurmsExtension implements RequestHandlerResul
6464
private Map<Long, RasaClientInfo> idToClientInfo;
6565

6666
@Override
67-
public void onStarted() {
68-
setUp();
67+
public Mono<Void> onStarted() {
68+
return setUp();
6969
}
7070

71-
private void setUp() {
71+
private Mono<Void> setUp() {
7272
RasaProperties properties = loadProperties(RasaProperties.class);
7373
if (!properties.isEnabled()
7474
|| properties.getInstanceFindStrategy() != InstanceFindStrategy.PROPERTY) {
75-
return;
75+
return Mono.empty();
7676
}
7777
List<RasaProperties.InstanceProperties> instancePropertiesList = properties.getInstances();
7878
if (instancePropertiesList.isEmpty()) {
79-
return;
79+
return Mono.empty();
8080
}
8181
int size = instancePropertiesList.size();
8282
Map<URI, RasaClientInfo> uriToClientInfo = CollectionUtil.newMapWithExpectedSize(size);
@@ -87,10 +87,10 @@ private void setUp() {
8787
try {
8888
uri = new URI(url);
8989
} catch (URISyntaxException e) {
90-
throw new IllegalArgumentException(
90+
return Mono.error(new IllegalArgumentException(
9191
"Illegal endpoint URL: "
9292
+ url,
93-
e);
93+
e));
9494
}
9595
int requestTimeoutMillis = instanceProperties.getRequest()
9696
.getTimeoutMillis();
@@ -101,15 +101,16 @@ private void setUp() {
101101
Long chatbotUserId = instanceProperties.getChatbotUserId();
102102
RasaClientInfo existingClientInfo = idToClientInfo.put(chatbotUserId, newClientInfo);
103103
if (existingClientInfo != null) {
104-
throw new IllegalArgumentException(
104+
return Mono.error(new IllegalArgumentException(
105105
"Found a duplicate chatbot user ID: "
106-
+ chatbotUserId);
106+
+ chatbotUserId));
107107
}
108108
}
109109
this.idToClientInfo = Map.copyOf(idToClientInfo);
110110
chatbotUserIds = CollectionUtil.toImmutableSet(idToClientInfo.keySet());
111111
ApplicationContext context = getContext();
112112
messageService = context.getBean(MessageService.class);
113+
return Mono.empty();
113114
}
114115

115116
@Override

turms-server-common/src/main/java/im/turms/server/common/access/admin/dto/response/HttpHandlerResult.java

+13
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,19 @@ public static HttpHandlerResult<ResponseDTO<UpdateResultDTO>> updateResult(long
143143
return okIfTruthy(new UpdateResultDTO(modifiedCount, modifiedCount));
144144
}
145145

146+
public static Mono<HttpHandlerResult<ResponseDTO<UpdateResultDTO>>> updateResultByIntegerMono(
147+
Mono<Integer> data) {
148+
return okIfTruthy(data.map(number -> {
149+
Long count = number.longValue();
150+
return new UpdateResultDTO(count, count);
151+
}));
152+
}
153+
154+
public static Mono<HttpHandlerResult<ResponseDTO<UpdateResultDTO>>> updateResultByLongMono(
155+
Mono<Long> data) {
156+
return okIfTruthy(data.map(number -> new UpdateResultDTO(number, number)));
157+
}
158+
146159
public static Mono<HttpHandlerResult<ResponseDTO<DeleteResultDTO>>> deleteResult(
147160
Mono<DeleteResult> data) {
148161
return okIfTruthy(data.map(DeleteResultDTO::get));

turms-server-common/src/main/java/im/turms/server/common/domain/plugin/access/admin/controller/PluginController.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.List;
2323
import java.util.Set;
2424

25+
import reactor.core.publisher.Mono;
26+
2527
import im.turms.server.common.access.admin.dto.response.HttpHandlerResult;
2628
import im.turms.server.common.access.admin.dto.response.ResponseDTO;
2729
import im.turms.server.common.access.admin.dto.response.UpdateResultDTO;
@@ -106,20 +108,20 @@ public HttpHandlerResult<ResponseDTO<Collection<PluginDTO>>> getPlugins(
106108

107109
@PutMapping
108110
@RequiredPermission(AdminPermission.PLUGIN_UPDATE)
109-
public HttpHandlerResult<ResponseDTO<UpdateResultDTO>> updatePlugins(
111+
public Mono<HttpHandlerResult<ResponseDTO<UpdateResultDTO>>> updatePlugins(
110112
Set<String> ids,
111113
@RequestBody UpdatePluginDTO updatePluginDTO) {
112114
UpdatePluginDTO.PluginStatus status = updatePluginDTO.status();
113115
if (status == null) {
114-
return HttpHandlerResult.okIfTruthy(UpdateResultDTO.NONE);
116+
return HttpHandlerResult.okIfTruthy(Mono.just(UpdateResultDTO.NONE));
115117
}
116-
long count = switch (status) {
118+
Mono<Integer> count = switch (status) {
117119
case STARTED -> pluginManager.startPlugins(ids);
118120
case STOPPED -> pluginManager.stopPlugins(ids);
119121
case RESUMED -> pluginManager.resumePlugins(ids);
120122
case PAUSED -> pluginManager.pausePlugins(ids);
121123
};
122-
return HttpHandlerResult.okIfTruthy(new UpdateResultDTO(count, count));
124+
return HttpHandlerResult.updateResultByIntegerMono(count);
123125
}
124126

125127
@PostMapping("java")
@@ -162,11 +164,10 @@ public HttpHandlerResult<ResponseDTO<Void>> createJsPlugins(
162164

163165
@DeleteMapping
164166
@RequiredPermission(AdminPermission.PLUGIN_DELETE)
165-
public HttpHandlerResult<ResponseDTO<Void>> deletePlugins(
167+
public Mono<HttpHandlerResult<ResponseDTO<Void>>> deletePlugins(
166168
Set<String> ids,
167169
boolean deleteLocalFiles) {
168-
pluginManager.deletePlugins(ids, deleteLocalFiles);
169-
return HttpHandlerResult.RESPONSE_OK;
170+
return HttpHandlerResult.okIfTruthy(pluginManager.deletePlugins(ids, deleteLocalFiles));
170171
}
171172

172173
}

0 commit comments

Comments
 (0)