Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;

Expand All @@ -36,25 +39,42 @@ interface Factory<T extends MessageStreams> {
T withLifecycle(Closeable requester);
}

final class ResponseListener<T> implements BiConsumer<T, Throwable> {
private final Future<?> cancelHandle;
private final BiConsumer<? super T, ? super Throwable> listener;
class ResponseListener<T> implements BiConsumer<T, Throwable> {
final Future<?> cancelHandle;
final BiConsumer<? super T, ? super Throwable> listener;

private ResponseListener(
ResponseListener(
CompletionStage<?> response, @Nullable BiConsumer<? super T, ? super Throwable> listener) {
this.cancelHandle = response.toCompletableFuture();
this.listener = listener;
}

public static <T> ResponseListener<T> create(
CompletionStage<?> response, BiConsumer<? super T, ? super Throwable> listener) {
return new ResponseListener<T>(response, listener);
return new ResponseListener<>(response, listener);
}

public static <T> ResponseListener<T> create(
CompletionStage<?> response,
BiConsumer<? super T, ? super Throwable> listener,
@Nullable ScheduledExecutorService timeoutScheduler,
long timeoutMillis) {
return new ResponseTimeoutListener<T>(response, listener)
.scheduleTimeout(timeoutScheduler, timeoutMillis);
}

public static <T> ResponseListener<T> create(CompletionStage<?> response) {
return new ResponseListener<>(response, null);
}

public static <T> ResponseListener<T> create(
CompletionStage<?> response,
@Nullable ScheduledExecutorService timeoutScheduler,
long timeoutMillis) {
return new ResponseTimeoutListener<T>(response, null)
.scheduleTimeout(timeoutScheduler, timeoutMillis);
}

@Override
public void accept(T t, Throwable throwable) {
if (throwable instanceof CancellationException) {
Expand All @@ -67,6 +87,46 @@ public void accept(T t, Throwable throwable) {
}
}

final class ResponseTimeoutListener<T> extends ResponseListener<T> {
volatile ScheduledFuture<?> timeoutHandle;

ResponseTimeoutListener(
CompletionStage<?> response, @Nullable BiConsumer<? super T, ? super Throwable> listener) {
super(response, listener);
}

ResponseListener<T> scheduleTimeout(
@Nullable ScheduledExecutorService timeoutScheduler, long timeoutMillis) {
if (timeoutMillis > 0 && timeoutScheduler != null) {
timeoutHandle =
timeoutScheduler.schedule(
() -> {
cancelHandle.cancel(true);
},
timeoutMillis,
TimeUnit.MILLISECONDS);
}
return this;
}

@Override
public void accept(T t, Throwable throwable) {
ScheduledFuture<?> h = timeoutHandle;
if (h != null) {
h.cancel(true);
}
super.accept(t, throwable);
}
}

static ScheduledExecutorService timeoutScheduler(
MessageStreams messageStreams, long timeoutMillis) {
if (timeoutMillis <= 0) {
return null;
}
return messageStreams.scheduler().orElse(null);
}

@SuppressWarnings("all")
abstract class ServerFactory<T extends MessageStreams> implements RpcService.Factory<T> {
private final Object service;
Expand Down
Loading