Skip to content

Commit

Permalink
Support for async/reactive close methods (e.g. R2DBC)
Browse files Browse the repository at this point in the history
Closes gh-26991
  • Loading branch information
jhoeller committed Jun 2, 2023
1 parent 2685a35 commit 322cbca
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 18 deletions.
1 change: 1 addition & 0 deletions spring-beans/spring-beans.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
optional("org.apache.groovy:groovy-xml")
optional("org.jetbrains.kotlin:kotlin-reflect")
optional("org.jetbrains.kotlin:kotlin-stdlib")
optional("org.reactivestreams:reactive-streams")
testImplementation(testFixtures(project(":spring-core")))
testImplementation(project(":spring-core-test"))
testImplementation("jakarta.annotation:jakarta.annotation-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand Down Expand Up @@ -65,8 +72,12 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable {

private static final String SHUTDOWN_METHOD_NAME = "shutdown";


private static final Log logger = LogFactory.getLog(DisposableBeanAdapter.class);

private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", DisposableBeanAdapter.class.getClassLoader());


private final Object bean;

Expand Down Expand Up @@ -240,7 +251,7 @@ else if (this.destroyMethods != null) {
}
}
else if (this.destroyMethodNames != null) {
for (String destroyMethodName: this.destroyMethodNames) {
for (String destroyMethodName : this.destroyMethodNames) {
Method destroyMethod = determineDestroyMethod(destroyMethodName);
if (destroyMethod != null) {
invokeCustomDestroyMethod(
Expand Down Expand Up @@ -287,32 +298,40 @@ private Method findDestroyMethod(Class<?> clazz, String name) {
* assuming a "force" parameter), else logging an error.
*/
private void invokeCustomDestroyMethod(Method destroyMethod) {
if (logger.isTraceEnabled()) {
logger.trace("Invoking custom destroy method '" + destroyMethod.getName() +
"' on bean with name '" + this.beanName + "': " + destroyMethod);
}

int paramCount = destroyMethod.getParameterCount();
final Object[] args = new Object[paramCount];
Object[] args = new Object[paramCount];
if (paramCount == 1) {
args[0] = Boolean.TRUE;
}
if (logger.isTraceEnabled()) {
logger.trace("Invoking custom destroy method '" + destroyMethod.getName() +
"' on bean with name '" + this.beanName + "'");
}

try {
ReflectionUtils.makeAccessible(destroyMethod);
destroyMethod.invoke(this.bean, args);
}
catch (InvocationTargetException ex) {
if (logger.isWarnEnabled()) {
String msg = "Custom destroy method '" + destroyMethod.getName() + "' on bean with name '" +
this.beanName + "' threw an exception";
Object returnValue = destroyMethod.invoke(this.bean, args);

if (returnValue == null) {
// Regular case: a void method
logDestroyMethodCompletion(destroyMethod, false);
}
else if (returnValue instanceof Future<?> future) {
// An async task: await its completion.
future.get();
logDestroyMethodCompletion(destroyMethod, true);
}
else if (!reactiveStreamsPresent || !new ReactiveDestroyMethodHandler().await(destroyMethod, returnValue)) {
if (logger.isDebugEnabled()) {
// Log at warn level like below but add the exception stacktrace only with debug level
logger.warn(msg, ex.getTargetException());
}
else {
logger.warn(msg + ": " + ex.getTargetException());
logger.debug("Unknown return value type from custom destroy method '" + destroyMethod.getName() +
"' on bean with name '" + this.beanName + "': " + returnValue.getClass());
}
}
}
catch (InvocationTargetException | ExecutionException ex) {
logDestroyMethodException(destroyMethod, ex.getCause());
}
catch (Throwable ex) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to invoke custom destroy method '" + destroyMethod.getName() +
Expand All @@ -321,6 +340,27 @@ private void invokeCustomDestroyMethod(Method destroyMethod) {
}
}

void logDestroyMethodException(Method destroyMethod, Throwable ex) {
if (logger.isWarnEnabled()) {
String msg = "Custom destroy method '" + destroyMethod.getName() + "' on bean with name '" +
this.beanName + "' propagated an exception";
if (logger.isDebugEnabled()) {
// Log at warn level like below but add the exception stacktrace only with debug level
logger.warn(msg, ex);
}
else {
logger.warn(msg + ": " + ex);
}
}
}

void logDestroyMethodCompletion(Method destroyMethod, boolean async) {
if (logger.isDebugEnabled()) {
logger.debug("Custom destroy method '" + destroyMethod.getName() +
"' on bean with name '" + this.beanName + "' completed" + (async ? " asynchronously" : ""));
}
}


/**
* Serializes a copy of the state of this class,
Expand Down Expand Up @@ -443,4 +483,59 @@ private static List<DestructionAwareBeanPostProcessor> filterPostProcessors(
return filteredPostProcessors;
}


/**
* Inner class to avoid a hard dependency on the Reactive Streams API at runtime.
*/
private class ReactiveDestroyMethodHandler {

public boolean await(Method destroyMethod, Object returnValue) throws InterruptedException {
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(returnValue.getClass());
if (adapter != null) {
CountDownLatch latch = new CountDownLatch(1);
adapter.toPublisher(returnValue).subscribe(new DestroyMethodSubscriber(destroyMethod, latch));
latch.await();
return true;
}
return false;
}
}


/**
* Reactive Streams Subscriber for destroy method completion.
*/
private class DestroyMethodSubscriber implements Subscriber<Object> {

private final Method destroyMethod;

private final CountDownLatch latch;

public DestroyMethodSubscriber(Method destroyMethod, CountDownLatch latch) {
this.destroyMethod = destroyMethod;
this.latch = latch;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}

@Override
public void onNext(Object o) {
}

@Override
public void onError(Throwable t) {
this.latch.countDown();
logDestroyMethodException(this.destroyMethod, t);
}

@Override
public void onComplete() {
this.latch.countDown();
logDestroyMethodCompletion(this.destroyMethod, true);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,10 @@
package org.springframework.context.annotation;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ConfigurableApplicationContext;
Expand Down Expand Up @@ -47,6 +49,8 @@ public void beanMethods() {
WithInheritedCloseMethod c8 = ctx.getBean("c8", WithInheritedCloseMethod.class);
WithDisposableBean c9 = ctx.getBean("c9", WithDisposableBean.class);
WithAutoCloseable c10 = ctx.getBean("c10", WithAutoCloseable.class);
WithCompletableFutureMethod c11 = ctx.getBean("c11", WithCompletableFutureMethod.class);
WithReactorMonoMethod c12 = ctx.getBean("c12", WithReactorMonoMethod.class);

assertThat(c0.closed).as("c0").isFalse();
assertThat(c1.closed).as("c1").isFalse();
Expand All @@ -59,6 +63,8 @@ public void beanMethods() {
assertThat(c8.closed).as("c8").isFalse();
assertThat(c9.closed).as("c9").isFalse();
assertThat(c10.closed).as("c10").isFalse();
assertThat(c11.closed).as("c11").isFalse();
assertThat(c12.closed).as("c12").isFalse();

ctx.close();
assertThat(c0.closed).as("c0").isTrue();
Expand All @@ -72,6 +78,8 @@ public void beanMethods() {
assertThat(c8.closed).as("c8").isFalse();
assertThat(c9.closed).as("c9").isTrue();
assertThat(c10.closed).as("c10").isTrue();
assertThat(c11.closed).as("c11").isTrue();
assertThat(c12.closed).as("c12").isTrue();
}

@Test
Expand Down Expand Up @@ -171,6 +179,16 @@ public WithDisposableBean c9() {
public WithAutoCloseable c10() {
return new WithAutoCloseable();
}

@Bean
public WithCompletableFutureMethod c11() {
return new WithCompletableFutureMethod();
}

@Bean
public WithReactorMonoMethod c12() {
return new WithReactorMonoMethod();
}
}


Expand Down Expand Up @@ -242,4 +260,38 @@ public void close() {
}
}


static class WithCompletableFutureMethod {

boolean closed = false;

public CompletableFuture<Void> close() {
return CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
closed = true;
});
}
}


static class WithReactorMonoMethod {

boolean closed = false;

public Mono<Void> close() {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return Mono.fromRunnable(() -> closed = true);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.r2dbc.core;

import io.r2dbc.h2.CloseableConnectionFactory;
import io.r2dbc.h2.H2ConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.junit.jupiter.api.AfterEach;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

/**
* @author Juergen Hoeller
* @since 6.1
*/
public class H2DatabaseClientContextIntegrationTests extends H2DatabaseClientIntegrationTests {

AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);

CloseableConnectionFactory connectionFactory = context.getBean(CloseableConnectionFactory.class);


@Override
protected ConnectionFactory createConnectionFactory() {
return connectionFactory;
}

@AfterEach
public void tearDown() {
context.close();
assertThatExceptionOfType(R2dbcNonTransientResourceException.class).isThrownBy(
() -> connectionFactory.create().block());
}


@Configuration
static class Config {

@Bean
ConnectionFactory connectionFactory() {
return H2ConnectionFactory.inMemory("r2dbc-context");
}
}

}

0 comments on commit 322cbca

Please sign in to comment.