Skip to content

Commit

Permalink
Fix the problem of not being able to initialize preset plugins (#5874)
Browse files Browse the repository at this point in the history
#### What type of PR is this?

/kind bug
/area core
/area plugin
/milestone 2.16.x

#### What this PR does / why we need it:

This PR refactors plugin running state change method to resolve the problem of not being able to initialize preset plugins due to too small gap between installation and enabling.

#### Which issue(s) this PR fixes:

Fixes #5867

#### Does this PR introduce a user-facing change?

```release-note
解决初始化时无法正常启用插件的问题
```
  • Loading branch information
JohnNiang committed May 10, 2024
1 parent dc451e2 commit a629961
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,45 +283,56 @@ Mono<ServerResponse> changePluginRunningState(ServerRequest request) {
final var name = request.pathVariable("name");
return request.bodyToMono(RunningStateRequest.class)
.flatMap(runningState -> {
final var enable = runningState.isEnable();
return client.get(Plugin.class, name)
var enable = runningState.isEnable();
var updatedPlugin = Mono.defer(() -> client.get(Plugin.class, name))
.flatMap(plugin -> {
plugin.getSpec().setEnabled(enable);
return client.update(plugin);
})
.flatMap(plugin -> {
if (runningState.isAsync()) {
return Mono.just(plugin);
if (!Objects.equals(enable, plugin.getSpec().getEnabled())) {
plugin.getSpec().setEnabled(enable);
log.debug("Updating plugin {} state to {}", name, enable);
return client.update(plugin);
}
return waitForPluginToMeetExpectedState(name, p -> {
// when enabled = true,excepted phase = started || failed
// when enabled = false,excepted phase = !started
var phase = p.statusNonNull().getPhase();
log.debug("Checking plugin {} state, no need to update", name);
return Mono.just(plugin);
});

var async = runningState.isAsync();
if (!async) {
// if we want to wait the state of plugin to be updated
updatedPlugin = updatedPlugin
.flatMap(plugin -> {
var phase = plugin.statusNonNull().getPhase();
if (enable) {
return Plugin.Phase.STARTED.equals(phase)
|| Plugin.Phase.FAILED.equals(phase);
// if we request to enable the plugin
if (!(Plugin.Phase.STARTED.equals(phase)
|| Plugin.Phase.FAILED.equals(phase))) {
return Mono.error(UnexpectedPluginStateException::new);
}
} else {
// if we request to disable the plugin
if (Plugin.Phase.STARTED.equals(phase)) {
return Mono.error(UnexpectedPluginStateException::new);
}
}
return !Plugin.Phase.STARTED.equals(phase);
return Mono.just(plugin);
})
.retryWhen(
Retry.backoff(10, Duration.ofMillis(100))
.filter(UnexpectedPluginStateException.class::isInstance)
.doBeforeRetry(signal ->
log.debug("Waiting for plugin {} to meet expected state", name)
)
)
.doOnSuccess(plugin -> {
log.info("Plugin {} met expected state {}",
name, plugin.statusNonNull().getPhase());
});
});
}

return updatedPlugin;
})
.flatMap(plugin -> ServerResponse.ok().bodyValue(plugin));
}

Mono<Plugin> waitForPluginToMeetExpectedState(String name, Predicate<Plugin> predicate) {
return Mono.defer(() -> client.get(Plugin.class, name)
.map(plugin -> {
if (predicate.test(plugin)) {
return plugin;
}
throw new IllegalStateException("Plugin " + name + " is not in expected state");
})
)
.retryWhen(Retry.backoff(10, Duration.ofMillis(100))
.filter(IllegalStateException.class::isInstance)
);
}

@Data
@Schema(name = "PluginRunningStateRequest")
static class RunningStateRequest {
Expand Down Expand Up @@ -871,4 +882,8 @@ public void destroy() throws Exception {
this.cssBundle.set(null);
}
}

private static class UnexpectedPluginStateException extends IllegalStateException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,17 @@ private void syncPluginState(Plugin plugin) {

private static boolean requestToReload(Plugin plugin) {
var annotations = plugin.getMetadata().getAnnotations();
return annotations != null && annotations.remove(RELOAD_ANNO) != null;
return annotations != null && annotations.get(RELOAD_ANNO) != null;
}

private static void removeRequestToReload(Plugin plugin) {
var annotations = plugin.getMetadata().getAnnotations();
if (annotations != null) {
annotations.remove(RELOAD_ANNO);
}
}


private void cleanupResources(Plugin plugin) {
var pluginName = plugin.getMetadata().getName();
var reverseProxyName = buildReverseProxyName(pluginName);
Expand Down Expand Up @@ -394,6 +402,8 @@ private void loadOrReload(Plugin plugin) {
}
p = pluginManager.getPlugin(pluginName);
}
// ensure removing the reload annotation after the plugin is reloaded
removeRequestToReload(plugin);
}
if (p != null && pluginManager.getUnresolvedPlugins().contains(p)) {
pluginManager.unloadPlugin(pluginName);
Expand Down Expand Up @@ -586,6 +596,7 @@ public void pluginStateChanged(PluginStateEvent event) {
client.fetch(Plugin.class, pluginId)
.ifPresent(plugin -> {
if (!Objects.equals(true, plugin.getSpec().getEnabled())) {
log.info("Observed plugin {} started, enabling it.", pluginId);
plugin.getSpec().setEnabled(true);
client.update(plugin);
}
Expand All @@ -604,6 +615,7 @@ public void pluginStateChanged(PluginStateEvent event) {
.ifPresent(plugin -> {
if (!requestToReload(plugin)
&& Objects.equals(true, plugin.getSpec().getEnabled())) {
log.info("Observed plugin {} stopped, disabling it.", pluginId);
plugin.getSpec().setEnabled(false);
client.update(plugin);
}
Expand Down
22 changes: 21 additions & 1 deletion application/src/main/java/run/halo/app/plugin/SpringPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,36 @@ public SpringPlugin(PluginApplicationContextFactory contextFactory,

@Override
public void start() {
log.info("Preparing starting plugin {}", pluginContext.getName());
var pluginId = pluginContext.getName();
try {
// initialize context
var pluginId = pluginContext.getName();
this.context = contextFactory.create(pluginId);
log.info("Application context {} for plugin {} is created", this.context, pluginId);

var pluginOpt = context.getBeanProvider(Plugin.class)
.stream()
.findFirst();
log.info("Before publishing plugin starting event for plugin {}", pluginId);
context.publishEvent(new SpringPluginStartingEvent(this, this));
log.info("After publishing plugin starting event for plugin {}", pluginId);
if (pluginOpt.isPresent()) {
this.delegate = pluginOpt.get();
if (this.delegate instanceof BasePlugin basePlugin) {
basePlugin.setContext(pluginContext);
}
log.info("Starting {} for plugin {}", this.delegate, pluginId);
this.delegate.start();
log.info("Started {} for plugin {}", this.delegate, pluginId);
}
log.info("Before publishing plugin started event for plugin {}", pluginId);
context.publishEvent(new SpringPluginStartedEvent(this, this));
log.info("After publishing plugin started event for plugin {}", pluginId);
} catch (Throwable t) {
// try to stop plugin for cleaning resources if something went wrong
log.error(
"Cleaning up plugin resources for plugin {} due to not being able to start plugin.",
pluginId);
this.stop();
// propagate exception to invoker.
throw t;
Expand All @@ -54,16 +65,25 @@ public void start() {
public void stop() {
try {
if (context != null) {
log.info("Before publishing plugin stopping event for plugin {}",
pluginContext.getName());
context.publishEvent(new SpringPluginStoppingEvent(this, this));
log.info("After publishing plugin stopping event for plugin {}",
pluginContext.getName());
}
if (this.delegate != null) {
log.info("Stopping {} for plugin {}", this.delegate, pluginContext.getName());
this.delegate.stop();
log.info("Stopped {} for plugin {}", this.delegate, pluginContext.getName());
}
} finally {
if (context instanceof ConfigurableApplicationContext configurableContext) {
log.info("Closing plugin context for plugin {}", pluginContext.getName());
configurableContext.close();
log.info("Closed plugin context for plugin {}", pluginContext.getName());
}
// reset application context
log.info("Reset plugin context for plugin {}", pluginContext.getName());
context = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

import static java.util.Objects.requireNonNull;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.reactive.server.WebTestClient.bindToRouterFunction;
Expand Down Expand Up @@ -474,4 +477,70 @@ void writeAndGetCssResourceTest() {
.verifyComplete();
}
}

@Nested
class PluginStateChangeTest {

WebTestClient webClient;

@BeforeEach
void setUp() {
webClient = WebTestClient.bindToRouterFunction(endpoint.endpoint())
.build();
}

@Test
void shouldEnablePluginIfPluginWasNotStarted() {
var plugin = createPlugin("fake-plugin");
plugin.getSpec().setEnabled(false);
plugin.statusNonNull().setPhase(Plugin.Phase.RESOLVED);

when(client.get(Plugin.class, "fake-plugin")).thenReturn(Mono.just(plugin))
.thenReturn(Mono.fromSupplier(() -> {
plugin.statusNonNull().setPhase(Plugin.Phase.STARTED);
return plugin;
}));
when(client.update(plugin)).thenReturn(Mono.just(plugin));

var requestBody = new PluginEndpoint.RunningStateRequest();
requestBody.setEnable(true);
requestBody.setAsync(false);
webClient.put().uri("/plugins/fake-plugin/plugin-state")
.bodyValue(requestBody)
.exchange()
.expectStatus().isOk()
.expectBody(Plugin.class)
.value(p -> assertTrue(p.getSpec().getEnabled()));

verify(client, times(2)).get(Plugin.class, "fake-plugin");
verify(client).update(plugin);
}

@Test
void shouldDisablePluginIfAlreadyStarted() {
var plugin = createPlugin("fake-plugin");
plugin.getSpec().setEnabled(true);
plugin.statusNonNull().setPhase(Plugin.Phase.STARTED);

when(client.get(Plugin.class, "fake-plugin")).thenReturn(Mono.just(plugin))
.thenReturn(Mono.fromSupplier(() -> {
plugin.getStatus().setPhase(Plugin.Phase.STOPPED);
return plugin;
}));
when(client.update(plugin)).thenReturn(Mono.just(plugin));

var requestBody = new PluginEndpoint.RunningStateRequest();
requestBody.setEnable(false);
requestBody.setAsync(false);
webClient.put().uri("/plugins/fake-plugin/plugin-state")
.bodyValue(requestBody)
.exchange()
.expectStatus().isOk()
.expectBody(Plugin.class)
.value(p -> assertFalse(p.getSpec().getEnabled()));

verify(client, times(2)).get(Plugin.class, "fake-plugin");
verify(client).update(plugin);
}
}
}

0 comments on commit a629961

Please sign in to comment.