Skip to content

Commit

Permalink
fix: possible event-related deadlocks with some providers (#1314)
Browse files Browse the repository at this point in the history
* Move event emitting off the main thread to avoid deadlocks

When stacking event emitting inside an EventProvider, when using sychronization
the EventProvider can deadlock, to avoid this move the event emitting of the
main thread.

Signed-off-by: Philipp Fehre <[email protected]>

* Test fixes

Test provider should respect the init-delay during all test

Signed-off-by: Philipp Fehre <[email protected]>

* Add timeout to EventProviderTest

With the events being executed on a different thread, we need to wait to make
sure the thread is scheduled to have the events emitted.

Signed-off-by: Philipp Fehre <[email protected]>

* Don't reuse the JVM Process

Signed-off-by: Philipp Fehre <[email protected]>

---------

Signed-off-by: Philipp Fehre <[email protected]>
Co-authored-by: Philipp Fehre <[email protected]>
Co-authored-by: Michael Beemer <[email protected]>
Co-authored-by: Todd Baert <[email protected]>
  • Loading branch information
4 people authored Feb 13, 2025
1 parent 08c38fb commit c33ac2d
Show file tree
Hide file tree
Showing 6 changed files with 163 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.5.2</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>
${surefireArgLine}
</argLine>
Expand Down
30 changes: 28 additions & 2 deletions src/main/java/dev/openfeature/sdk/EventProvider.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package dev.openfeature.sdk;

import dev.openfeature.sdk.internal.TriConsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**
* Abstract EventProvider. Providers must extend this class to support events.
Expand All @@ -14,8 +18,10 @@
*
* @see FeatureProvider
*/
@Slf4j
public abstract class EventProvider implements FeatureProvider {
private EventProviderListener eventProviderListener;
private final ExecutorService emitterExecutor = Executors.newCachedThreadPool();

void setEventProviderListener(EventProviderListener eventProviderListener) {
this.eventProviderListener = eventProviderListener;
Expand Down Expand Up @@ -46,6 +52,24 @@ void detach() {
this.onEmit = null;
}

/**
* Stop the event emitter executor and block until either termination has completed
* or timeout period has elapsed.
*/
@Override
public void shutdown() {
emitterExecutor.shutdown();
try {
if (!emitterExecutor.awaitTermination(EventSupport.SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn("Emitter executor did not terminate before the timeout period had elapsed");
emitterExecutor.shutdownNow();
}
} catch (InterruptedException e) {
emitterExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

/**
* Emit the specified {@link ProviderEvent}.
*
Expand All @@ -56,8 +80,10 @@ public void emit(ProviderEvent event, ProviderEventDetails details) {
if (eventProviderListener != null) {
eventProviderListener.onEmit(event, details);
}
if (this.onEmit != null) {
this.onEmit.accept(this, event, details);

final TriConsumer<EventProvider, ProviderEvent, ProviderEventDetails> localOnEmit = this.onEmit;
if (localOnEmit != null) {
emitterExecutor.submit(() -> localOnEmit.accept(this, event, details));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/openfeature/sdk/EventSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
@Slf4j
class EventSupport {

public static final int SHUTDOWN_TIMEOUT_SECONDS = 3;

// we use a v4 uuid as a "placeholder" for anonymous clients, since
// ConcurrentHashMap doesn't support nulls
private static final String defaultClientUuid = UUID.randomUUID().toString();
private static final int SHUTDOWN_TIMEOUT_SECONDS = 3;
private final Map<String, HandlerStore> handlerStores = new ConcurrentHashMap<>();
private final HandlerStore globalHandlerStore = new HandlerStore();
private final ExecutorService taskExecutor = Executors.newCachedThreadPool(runnable -> {
final Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
});

Expand Down
27 changes: 23 additions & 4 deletions src/test/java/dev/openfeature/sdk/EventProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@
import static org.mockito.Mockito.*;

import dev.openfeature.sdk.internal.TriConsumer;
import dev.openfeature.sdk.testutils.TestStackedEmitCallsProvider;
import io.cucumber.java.AfterAll;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class EventProviderTest {

private static final int TIMEOUT = 300;

private TestEventProvider eventProvider;

@BeforeEach
Expand All @@ -21,6 +26,11 @@ void setup() {
eventProvider.initialize(null);
}

@AfterAll
public static void resetDefaultProvider() {
OpenFeatureAPI.getInstance().setProviderAndWait(new NoOpProvider());
}

@Test
@DisplayName("should run attached onEmit with emitters")
void emitsEventsWhenAttached() {
Expand All @@ -34,10 +44,10 @@ void emitsEventsWhenAttached() {
eventProvider.emitProviderStale(details);
eventProvider.emitProviderError(details);

verify(onEmit, times(2)).accept(eventProvider, ProviderEvent.PROVIDER_READY, details);
verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_STALE, details);
verify(onEmit, times(1)).accept(eventProvider, ProviderEvent.PROVIDER_ERROR, details);
verify(onEmit, timeout(TIMEOUT).times(2)).accept(eventProvider, ProviderEvent.PROVIDER_READY, details);
verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, details);
verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_STALE, details);
verify(onEmit, timeout(TIMEOUT)).accept(eventProvider, ProviderEvent.PROVIDER_ERROR, details);
}

@Test
Expand Down Expand Up @@ -75,6 +85,15 @@ void doesNotThrowWhenOnEmitSame() {
eventProvider.attach(onEmit2); // should not throw, same instance. noop
}

@Test
@SneakyThrows
@Timeout(value = 2, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
@DisplayName("should not deadlock on emit called during emit")
void doesNotDeadlockOnEmitStackedCalls() {
TestStackedEmitCallsProvider provider = new TestStackedEmitCallsProvider();
OpenFeatureAPI.getInstance().setProviderAndWait(provider);
}

static class TestEventProvider extends EventProvider {

private static final String NAME = "TestEventProvider";
Expand Down
10 changes: 5 additions & 5 deletions src/test/java/dev/openfeature/sdk/EventsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

class EventsTest {

private static final int TIMEOUT = 300;
private static final int TIMEOUT = 500;
private static final int INIT_DELAY = TIMEOUT / 2;

@AfterAll
Expand Down Expand Up @@ -601,13 +601,13 @@ void matchingStaleEventsMustRunImmediately() {
OpenFeatureAPI api = OpenFeatureAPI.getInstance();

// provider which is already stale
TestEventsProvider provider = TestEventsProvider.newInitializedTestEventsProvider();
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
Client client = api.getClient(name);
api.setProviderAndWait(name, provider);
provider.emitProviderStale(ProviderEventDetails.builder().build());
assertThat(client.getProviderState()).isEqualTo(ProviderState.STALE);

// should run even thought handler was added after stale
// should run even though handler was added after stale
client.onProviderStale(handler);
verify(handler, timeout(TIMEOUT)).accept(any());
}
Expand All @@ -623,13 +623,13 @@ void matchingErrorEventsMustRunImmediately() {
OpenFeatureAPI api = OpenFeatureAPI.getInstance();

// provider which is already in error
TestEventsProvider provider = new TestEventsProvider();
TestEventsProvider provider = new TestEventsProvider(INIT_DELAY);
Client client = api.getClient(name);
api.setProviderAndWait(name, provider);
provider.emitProviderError(ProviderEventDetails.builder().build());
assertThat(client.getProviderState()).isEqualTo(ProviderState.ERROR);

// should run even thought handler was added after error
// should run even though handler was added after error
client.onProviderError(handler);
verify(handler, timeout(TIMEOUT)).accept(any());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package dev.openfeature.sdk.testutils;

import dev.openfeature.sdk.EvaluationContext;
import dev.openfeature.sdk.EventProvider;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Value;
import java.util.function.Consumer;

public class TestStackedEmitCallsProvider extends EventProvider {
private final NestedBlockingEmitter nestedBlockingEmitter = new NestedBlockingEmitter(this::onProviderEvent);

@Override
public Metadata getMetadata() {
return () -> getClass().getSimpleName();
}

@Override
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (nestedBlockingEmitter) {
nestedBlockingEmitter.init();
while (!nestedBlockingEmitter.isReady()) {
try {
nestedBlockingEmitter.wait();
} catch (InterruptedException e) {
}
}
}
}

private void onProviderEvent(ProviderEvent providerEvent) {
synchronized (nestedBlockingEmitter) {
if (providerEvent == ProviderEvent.PROVIDER_READY) {
nestedBlockingEmitter.setReady();
/*
* This line deadlocked in the original implementation without the emitterExecutor see
* https://github.com/open-feature/java-sdk/issues/1299
*/
emitProviderReady(ProviderEventDetails.builder().build());
}
}
}

@Override
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getBooleanEvaluation'");
}

@Override
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getStringEvaluation'");
}

@Override
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getIntegerEvaluation'");
}

@Override
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getDoubleEvaluation'");
}

@Override
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
throw new UnsupportedOperationException("Unimplemented method 'getObjectEvaluation'");
}

static class NestedBlockingEmitter {

private final Consumer<ProviderEvent> emitProviderEvent;
private volatile boolean isReady;

public NestedBlockingEmitter(Consumer<ProviderEvent> emitProviderEvent) {
this.emitProviderEvent = emitProviderEvent;
}

public void init() {
// run init outside monitored thread
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

emitProviderEvent.accept(ProviderEvent.PROVIDER_READY);
})
.start();
}

public boolean isReady() {
return isReady;
}

public synchronized void setReady() {
isReady = true;
this.notifyAll();
}
}
}

0 comments on commit c33ac2d

Please sign in to comment.