Skip to content

Commit

Permalink
Support reactive-styles plugin extension lifecycle hooks to support f…
Browse files Browse the repository at this point in the history
…lexible implementation and improve performance
  • Loading branch information
JamesChenX committed Dec 3, 2023
1 parent 1853430 commit 5b70236
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public AntiSpamHandler(AntiSpamProperties properties) {
}

@Override
protected void onStarted() {
protected Mono<Void> onStarted() {
AntiSpamProperties properties = loadProperties(AntiSpamProperties.class);
enabled = properties.isEnabled();
textPreprocessor = new TextPreprocessor(properties.getTextParsingStrategy());
Expand All @@ -95,6 +95,7 @@ protected void onStarted() {
: null;
textTypeToProperties = createTextTypeToPropertiesMap(properties.getTextTypes(),
properties.getSilentIllegalTextTypes());
return Mono.empty();
}

private Map<TurmsRequest.KindCase, TextTypeProperties> createTextTypeToPropertiesMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.springframework.context.ApplicationContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

import im.turms.plugin.minio.core.BucketPolicy;
import im.turms.plugin.minio.core.BucketPolicyAction;
Expand Down Expand Up @@ -85,6 +86,8 @@
import im.turms.server.common.infra.security.MacUtil;
import im.turms.server.common.infra.time.DateRange;
import im.turms.server.common.infra.time.DurationConst;
import im.turms.server.common.infra.tracing.TracingCloseableContext;
import im.turms.server.common.infra.tracing.TracingContext;
import im.turms.server.common.storage.mongo.TurmsMongoClient;
import im.turms.service.domain.group.service.GroupMemberService;
import im.turms.service.domain.storage.bo.StorageResourceInfo;
Expand Down Expand Up @@ -122,8 +125,10 @@ public class MinioStorageServiceProvider extends TurmsExtension implements Stora
* @implNote 1. We use HMAC(key, message) instead of a HASH(key + message) to avoid the length
* extension attack. To put simply, if a hacker knows the signature of the resource
* "1", and he can also know the signature of resource "12", "13", "123", and so on
* without knowledge of the key. 2. Use MD5 because its output size (128 bits) is
* small, and it is a 22-character Base62-encoded string.
* without knowledge of the key.
* <p>
* 2. Use MD5 because its output size (128 bits) is small, and it is a 22-character
* Base62-encoded string.
*/
private boolean isMacEnabled;
@Nullable
Expand Down Expand Up @@ -156,30 +161,30 @@ public class MinioStorageServiceProvider extends TurmsExtension implements Stora
}

@Override
public void onStarted() {
setUp();
public Mono<Void> onStarted() {
return setUp();
}

private void setUp() {
private Mono<Void> setUp() {
MinioStorageProperties properties = loadProperties(MinioStorageProperties.class);
if (!properties.isEnabled()) {
return;
return Mono.empty();
}
String endpoint = properties.getEndpoint();
URI uri;
try {
uri = new URI(endpoint);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
return Mono.error(new IllegalArgumentException(
"Illegal endpoint URL: "
+ endpoint,
e);
e));
}
if (!uri.isAbsolute()) {
throw new IllegalArgumentException(
return Mono.error(new IllegalArgumentException(
"The endpoint URL ("
+ endpoint
+ ") must be absolute");
+ ") must be absolute"));
}
ApplicationContext context = getContext();
node = context.getBean(Node.class);
Expand Down Expand Up @@ -217,15 +222,15 @@ private void setUp() {
key = Base64.getDecoder()
.decode(base64Key);
} catch (Exception e) {
throw new IllegalArgumentException(
return Mono.error(new IllegalArgumentException(
"The HMAC key must be Base64-encoded, but got: "
+ base64Key,
e);
e));
}
if (key.length < 16) {
throw new IllegalArgumentException(
return Mono.error(new IllegalArgumentException(
"The length of HMAC key must be greater than or equal to 16, but got: "
+ key.length);
+ key.length));
}
macKey = new SecretKeySpec(key, "HmacMD5");
} else {
Expand All @@ -235,42 +240,32 @@ private void setUp() {
properties.getRegion(),
properties.getAccessKey(),
properties.getSecretKey());
Duration timeout = Duration.ofSeconds(INIT_BUCKETS_TIMEOUT_SECONDS);
try {
initBuckets().block(timeout);
isServing = true;
} catch (Exception e) {
MinioStorageProperties.Retry retry = properties.getRetry();
int maxAttempts = retry.getMaxAttempts();
if (!retry.isEnabled() || maxAttempts <= 0) {
throw new RuntimeException("Failed to initialize the MinIO client", e);
}
LOGGER.error("Failed to initialize the MinIO client. Retry times: 0", e);
try {
Thread.sleep(retry.getInitialIntervalMillis());
} catch (InterruptedException ex) {
throw new RuntimeException("Failed to initialize the MinIO client", e);
}
for (int currentRetryTimes = 1; currentRetryTimes <= maxAttempts; currentRetryTimes++) {
try {
initBuckets().block(timeout);
} catch (Exception ex) {
LOGGER.error("Failed to initialize the MinIO client. Retry times: "
+ currentRetryTimes, ex);
if (currentRetryTimes == maxAttempts) {
throw new RuntimeException(
"Failed to initialize the MinIO client with retries exhausted: "
+ maxAttempts);

Mono<Void> initBuckets =
initBuckets().timeout(Duration.ofSeconds(INIT_BUCKETS_TIMEOUT_SECONDS))
.doOnSuccess(unused -> isServing = true);

MinioStorageProperties.Retry retry = properties.getRetry();
int maxAttempts = retry.getMaxAttempts();
if (!retry.isEnabled() || maxAttempts <= 0) {
return initBuckets.onErrorMap(t -> true,
t -> new RuntimeException("Failed to initialize the MinIO client", t));
}
return initBuckets.retryWhen(Retry.max(maxAttempts)
.doBeforeRetryAsync(retrySignal -> Mono.deferContextual(contextView -> {
long totalRetries = retrySignal.totalRetries();
try (TracingCloseableContext ignored =
TracingContext.getCloseableContext(contextView)) {
LOGGER.error("Failed to initialize the MinIO client. Retry times: "
+ totalRetries, retrySignal.failure());
}
try {
Thread.sleep(retry.getIntervalMillis());
} catch (InterruptedException ignored) {
throw new RuntimeException("Failed to initialize the MinIO client", ex);
if (0 == totalRetries) {
return Mono.delay(Duration.ofMillis(retry.getInitialIntervalMillis()))
.then();
}
}
}
isServing = true;
}
return Mono.delay(Duration.ofMillis(retry.getIntervalMillis()))
.then();
})));
}

private void initClient(String endpoint, String region, String accessKey, String secretKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import im.turms.server.common.infra.logging.core.logger.Logger;
import im.turms.server.common.infra.logging.core.logger.LoggerFactory;
import im.turms.server.common.infra.plugin.TurmsExtension;
import im.turms.server.common.infra.time.DurationConst;
import im.turms.server.common.infra.tracing.TracingCloseableContext;
import im.turms.server.common.infra.tracing.TracingContext;
import im.turms.service.access.servicerequest.dto.RequestHandlerResult;
Expand All @@ -59,19 +58,19 @@ public class NotificationPusher extends TurmsExtension implements RequestHandler
private List<String> deviceTokenFieldNames;

@Override
protected void onStarted() {
protected Mono<Void> onStarted() {
PushNotificationProperties properties = loadProperties(PushNotificationProperties.class);
manager = new PushNotificationManager(properties);
userService = getContext().getBean(UserService.class);
userStatusService = getContext().getBean(UserStatusService.class);

deviceTokenFieldNames = manager.getDeviceTokenFieldNames();
return Mono.empty();
}

@Override
protected void onStopped() {
manager.close()
.block(DurationConst.ONE_MINUTE);
protected Mono<Void> onStopped() {
return manager.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ public class RasaResponser extends TurmsExtension implements RequestHandlerResul
private Map<Long, RasaClientInfo> idToClientInfo;

@Override
public void onStarted() {
setUp();
public Mono<Void> onStarted() {
return setUp();
}

private void setUp() {
private Mono<Void> setUp() {
RasaProperties properties = loadProperties(RasaProperties.class);
if (!properties.isEnabled()
|| properties.getInstanceFindStrategy() != InstanceFindStrategy.PROPERTY) {
return;
return Mono.empty();
}
List<RasaProperties.InstanceProperties> instancePropertiesList = properties.getInstances();
if (instancePropertiesList.isEmpty()) {
return;
return Mono.empty();
}
int size = instancePropertiesList.size();
Map<URI, RasaClientInfo> uriToClientInfo = CollectionUtil.newMapWithExpectedSize(size);
Expand All @@ -87,10 +87,10 @@ private void setUp() {
try {
uri = new URI(url);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
return Mono.error(new IllegalArgumentException(
"Illegal endpoint URL: "
+ url,
e);
e));
}
int requestTimeoutMillis = instanceProperties.getRequest()
.getTimeoutMillis();
Expand All @@ -101,15 +101,16 @@ private void setUp() {
Long chatbotUserId = instanceProperties.getChatbotUserId();
RasaClientInfo existingClientInfo = idToClientInfo.put(chatbotUserId, newClientInfo);
if (existingClientInfo != null) {
throw new IllegalArgumentException(
return Mono.error(new IllegalArgumentException(
"Found a duplicate chatbot user ID: "
+ chatbotUserId);
+ chatbotUserId));
}
}
this.idToClientInfo = Map.copyOf(idToClientInfo);
chatbotUserIds = CollectionUtil.toImmutableSet(idToClientInfo.keySet());
ApplicationContext context = getContext();
messageService = context.getBean(MessageService.class);
return Mono.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,19 @@ public static HttpHandlerResult<ResponseDTO<UpdateResultDTO>> updateResult(long
return okIfTruthy(new UpdateResultDTO(modifiedCount, modifiedCount));
}

public static Mono<HttpHandlerResult<ResponseDTO<UpdateResultDTO>>> updateResultByIntegerMono(
Mono<Integer> data) {
return okIfTruthy(data.map(number -> {
Long count = number.longValue();
return new UpdateResultDTO(count, count);
}));
}

public static Mono<HttpHandlerResult<ResponseDTO<UpdateResultDTO>>> updateResultByLongMono(
Mono<Long> data) {
return okIfTruthy(data.map(number -> new UpdateResultDTO(number, number)));
}

public static Mono<HttpHandlerResult<ResponseDTO<DeleteResultDTO>>> deleteResult(
Mono<DeleteResult> data) {
return okIfTruthy(data.map(DeleteResultDTO::get));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Set;

import reactor.core.publisher.Mono;

import im.turms.server.common.access.admin.dto.response.HttpHandlerResult;
import im.turms.server.common.access.admin.dto.response.ResponseDTO;
import im.turms.server.common.access.admin.dto.response.UpdateResultDTO;
Expand Down Expand Up @@ -106,20 +108,20 @@ public HttpHandlerResult<ResponseDTO<Collection<PluginDTO>>> getPlugins(

@PutMapping
@RequiredPermission(AdminPermission.PLUGIN_UPDATE)
public HttpHandlerResult<ResponseDTO<UpdateResultDTO>> updatePlugins(
public Mono<HttpHandlerResult<ResponseDTO<UpdateResultDTO>>> updatePlugins(
Set<String> ids,
@RequestBody UpdatePluginDTO updatePluginDTO) {
UpdatePluginDTO.PluginStatus status = updatePluginDTO.status();
if (status == null) {
return HttpHandlerResult.okIfTruthy(UpdateResultDTO.NONE);
return HttpHandlerResult.okIfTruthy(Mono.just(UpdateResultDTO.NONE));
}
long count = switch (status) {
Mono<Integer> count = switch (status) {
case STARTED -> pluginManager.startPlugins(ids);
case STOPPED -> pluginManager.stopPlugins(ids);
case RESUMED -> pluginManager.resumePlugins(ids);
case PAUSED -> pluginManager.pausePlugins(ids);
};
return HttpHandlerResult.okIfTruthy(new UpdateResultDTO(count, count));
return HttpHandlerResult.updateResultByIntegerMono(count);
}

@PostMapping("java")
Expand Down Expand Up @@ -162,11 +164,10 @@ public HttpHandlerResult<ResponseDTO<Void>> createJsPlugins(

@DeleteMapping
@RequiredPermission(AdminPermission.PLUGIN_DELETE)
public HttpHandlerResult<ResponseDTO<Void>> deletePlugins(
public Mono<HttpHandlerResult<ResponseDTO<Void>>> deletePlugins(
Set<String> ids,
boolean deleteLocalFiles) {
pluginManager.deletePlugins(ids, deleteLocalFiles);
return HttpHandlerResult.RESPONSE_OK;
return HttpHandlerResult.okIfTruthy(pluginManager.deletePlugins(ids, deleteLocalFiles));
}

}
Loading

0 comments on commit 5b70236

Please sign in to comment.