Skip to content

Commit

Permalink
Revert "core: delay sending cancel request on client-side when deadli…
Browse files Browse the repository at this point in the history
…ne expires (#6328)" (#7457)
  • Loading branch information
ran-su authored Sep 25, 2020
1 parent 10b960e commit 7ca6c02
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 133 deletions.
168 changes: 69 additions & 99 deletions core/src/main/java/io/grpc/internal/ClientCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,37 +71,29 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private static final Logger log = Logger.getLogger(ClientCallImpl.class.getName());
private static final byte[] FULL_STREAM_DECOMPRESSION_ENCODINGS
= "gzip".getBytes(Charset.forName("US-ASCII"));
// When a deadline is exceeded, there is a race between the server receiving the cancellation from
// the client and the server cancelling the stream itself. If the client's cancellation is
// received first, then the stream's status will be CANCELLED instead of DEADLINE_EXCEEDED.
// This prevents server monitoring from noticing high rate of DEADLINE_EXCEEDED, a common
// monitoring metric (b/118879795). Mitigate this by delayed sending of the client's cancellation.
@VisibleForTesting
static final long DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);

private final MethodDescriptor<ReqT, RespT> method;
private final Tag tag;
private final Executor callExecutor;
private final boolean callExecutorIsDirect;
private final CallTracer channelCallsTracer;
private final Context context;
private volatile ScheduledFuture<?> deadlineCancellationFuture;
private final boolean unaryRequest;
private CallOptions callOptions;
private ClientStream stream;
private volatile boolean cancelListenersShouldBeRemoved;
private boolean cancelCalled;
private boolean halfCloseCalled;
private final ClientStreamProvider clientStreamProvider;
private ContextCancellationListener cancellationListener;
private final ContextCancellationListener cancellationListener =
new ContextCancellationListener();
private final ScheduledExecutorService deadlineCancellationExecutor;
@Nullable
private final InternalConfigSelector configSelector;
private boolean fullStreamDecompression;
private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
private CompressorRegistry compressorRegistry = CompressorRegistry.getDefaultInstance();
private volatile ScheduledFuture<?> deadlineCancellationNotifyApplicationFuture;
private volatile ScheduledFuture<?> deadlineCancellationSendToServerFuture;
private boolean observerClosed = false;

ClientCallImpl(
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
Expand Down Expand Up @@ -135,20 +127,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
}

private final class ContextCancellationListener implements CancellationListener {
private Listener<RespT> observer;

private ContextCancellationListener(Listener<RespT> observer) {
this.observer = observer;
}

@Override
public void cancelled(Context context) {
if (context.getDeadline() == null || !context.getDeadline().isExpired()) {
stream.cancel(statusFromCancelled(context));
} else {
Status status = statusFromCancelled(context);
delayedCancelOnDeadlineExceeded(status, observer);
}
stream.cancel(statusFromCancelled(context));
}
}

Expand Down Expand Up @@ -223,7 +204,19 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
// Context is already cancelled so no need to create a real stream, just notify the observer
// of cancellation via callback on the executor
stream = NoopClientStream.INSTANCE;
executeCloseObserverInContext(observer, statusFromCancelled(context));
final Listener<RespT> finalObserver = observer;
class ClosedByContext extends ContextRunnable {
ClosedByContext() {
super(context);
}

@Override
public void runInContext() {
closeObserver(finalObserver, statusFromCancelled(context), new Metadata());
}
}

callExecutor.execute(new ClosedByContext());
return;
}

Expand Down Expand Up @@ -251,9 +244,23 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
compressor = compressorRegistry.lookupCompressor(compressorName);
if (compressor == null) {
stream = NoopClientStream.INSTANCE;
Status status = Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name %s", compressorName));
executeCloseObserverInContext(observer, status);
final Listener<RespT> finalObserver = observer;
class ClosedByNotFoundCompressor extends ContextRunnable {
ClosedByNotFoundCompressor() {
super(context);
}

@Override
public void runInContext() {
closeObserver(
finalObserver,
Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name %s", compressorName)),
new Metadata());
}
}

callExecutor.execute(new ClosedByNotFoundCompressor());
return;
}
} else {
Expand Down Expand Up @@ -294,7 +301,6 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
}
stream.setDecompressorRegistry(decompressorRegistry);
channelCallsTracer.reportCallStarted();
cancellationListener = new ContextCancellationListener(observer);
stream.start(new ClientStreamListenerImpl(observer));

// Delay any sources of cancellation after start(), because most of the transports are broken if
Expand All @@ -306,11 +312,8 @@ private void startInternal(Listener<RespT> observer, Metadata headers) {
// If the context has the effective deadline, we don't need to schedule an extra task.
&& !effectiveDeadline.equals(context.getDeadline())
// If the channel has been terminated, we don't need to schedule an extra task.
&& deadlineCancellationExecutor != null
// if already expired deadline let failing stream handle
&& !(stream instanceof FailingClientStream)) {
deadlineCancellationNotifyApplicationFuture =
startDeadlineNotifyApplicationTimer(effectiveDeadline, observer);
&& deadlineCancellationExecutor != null) {
deadlineCancellationFuture = startDeadlineTimer(effectiveDeadline);
}
if (cancelListenersShouldBeRemoved) {
// Race detected! ClientStreamListener.closed may have been called before
Expand Down Expand Up @@ -410,76 +413,46 @@ private static void logIfContextNarrowedTimeout(

private void removeContextListenerAndCancelDeadlineFuture() {
context.removeListener(cancellationListener);
ScheduledFuture<?> f = deadlineCancellationSendToServerFuture;
if (f != null) {
f.cancel(false);
}

f = deadlineCancellationNotifyApplicationFuture;
ScheduledFuture<?> f = deadlineCancellationFuture;
if (f != null) {
f.cancel(false);
}
}

private ScheduledFuture<?> startDeadlineNotifyApplicationTimer(Deadline deadline,
final Listener<RespT> observer) {
final long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);

class DeadlineExceededNotifyApplicationTimer implements Runnable {
@Override
public void run() {
Status status = buildDeadlineExceededStatusWithRemainingNanos(remainingNanos);
delayedCancelOnDeadlineExceeded(status, observer);
}
}

return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(new DeadlineExceededNotifyApplicationTimer()),
remainingNanos,
TimeUnit.NANOSECONDS);
}

private Status buildDeadlineExceededStatusWithRemainingNanos(long remainingNanos) {
final InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
private class DeadlineTimer implements Runnable {
private final long remainingNanos;

long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);

StringBuilder buf = new StringBuilder();
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
}
buf.append(seconds);
buf.append(String.format(".%09d", nanos));
buf.append("s. ");
buf.append(insight);

return DEADLINE_EXCEEDED.augmentDescription(buf.toString());
}

private void delayedCancelOnDeadlineExceeded(final Status status, Listener<RespT> observer) {
if (deadlineCancellationSendToServerFuture != null) {
return;
DeadlineTimer(long remainingNanos) {
this.remainingNanos = remainingNanos;
}

class DeadlineExceededSendCancelToServerTimer implements Runnable {
@Override
public void run() {
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
stream.cancel(status);
@Override
public void run() {
InsightBuilder insight = new InsightBuilder();
stream.appendTimeoutInsight(insight);
// DelayedStream.cancel() is safe to call from a thread that is different from where the
// stream is created.
long seconds = Math.abs(remainingNanos) / TimeUnit.SECONDS.toNanos(1);
long nanos = Math.abs(remainingNanos) % TimeUnit.SECONDS.toNanos(1);

StringBuilder buf = new StringBuilder();
buf.append("deadline exceeded after ");
if (remainingNanos < 0) {
buf.append('-');
}
buf.append(seconds);
buf.append(String.format(".%09d", nanos));
buf.append("s. ");
buf.append(insight);
stream.cancel(DEADLINE_EXCEEDED.augmentDescription(buf.toString()));
}
}

// This races with removeContextListenerAndCancelDeadlineFuture(). Since calling cancel() on a
// stream multiple time is safe, the race here is fine.
deadlineCancellationSendToServerFuture = deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(new DeadlineExceededSendCancelToServerTimer()),
DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS,
TimeUnit.NANOSECONDS);
executeCloseObserverInContext(observer, status);
private ScheduledFuture<?> startDeadlineTimer(Deadline deadline) {
long remainingNanos = deadline.timeRemaining(TimeUnit.NANOSECONDS);
return deadlineCancellationExecutor.schedule(
new LogExceptionRunnable(
new DeadlineTimer(remainingNanos)), remainingNanos, TimeUnit.NANOSECONDS);
}

private void executeCloseObserverInContext(final Listener<RespT> observer, final Status status) {
Expand All @@ -497,13 +470,6 @@ public void runInContext() {
callExecutor.execute(new CloseInContext());
}

private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
if (!observerClosed) {
observerClosed = true;
observer.onClose(status, trailers);
}
}

@Nullable
private Deadline effectiveDeadline() {
// Call options and context are immutable, so we don't need to cache the deadline.
Expand Down Expand Up @@ -646,6 +612,10 @@ public Attributes getAttributes() {
return Attributes.EMPTY;
}

private void closeObserver(Listener<RespT> observer, Status status, Metadata trailers) {
observer.onClose(status, trailers);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("method", method).toString();
Expand Down
36 changes: 6 additions & 30 deletions core/src/test/java/io/grpc/internal/ClientCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.grpc.internal;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.internal.ClientCallImpl.DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS;
import static io.grpc.internal.GrpcUtil.ACCEPT_ENCODING_SPLITTER;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -60,7 +59,6 @@
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
import io.grpc.internal.testing.SingleMessageProducer;
Expand Down Expand Up @@ -137,9 +135,6 @@ public class ClientCallImplTest {
@Captor
private ArgumentCaptor<Status> statusArgumentCaptor;

@Captor
private ArgumentCaptor<Metadata> metadataArgumentCaptor;

private CallOptions baseCallOptions;

@Before
Expand Down Expand Up @@ -1005,21 +1000,9 @@ public void expiredDeadlineCancelsStream_CallOptions() {

call.start(callListener, new Metadata());

fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);

// Verify cancel sent to application when deadline just past
verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
verify(stream, never()).cancel(statusCaptor.capture());

fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS - 1);
verify(stream, never()).cancel(any(Status.class));
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);

// verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
fakeClock.forwardNanos(1);
verify(stream).cancel(statusCaptor.capture());
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription())
.matches("deadline exceeded after [0-9]+\\.[0-9]+s. \\[remote_addr=127\\.0\\.0\\.1:443\\]");
Expand All @@ -1029,8 +1012,8 @@ public void expiredDeadlineCancelsStream_CallOptions() {
public void expiredDeadlineCancelsStream_Context() {
fakeClock.forwardTime(System.nanoTime(), TimeUnit.NANOSECONDS);

Deadline deadline = Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker());
Context context = Context.current().withDeadline(deadline, deadlineCancellationExecutor);
Context context = Context.current()
.withDeadlineAfter(1, TimeUnit.SECONDS, deadlineCancellationExecutor);
Context origContext = context.attach();

ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
Expand All @@ -1045,16 +1028,9 @@ public void expiredDeadlineCancelsStream_Context() {

call.start(callListener, new Metadata());

fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
verify(stream, never()).cancel(statusCaptor.capture());
// verify app is notified.
verify(callListener).onClose(statusCaptor.capture(), metadataArgumentCaptor.capture());
assertThat(statusCaptor.getValue().getDescription()).contains("context timed out");
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Code.DEADLINE_EXCEEDED);
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1) + 1);

// verify cancel send to server is delayed with DEADLINE_EXPIRATION_CANCEL_DELAY
fakeClock.forwardNanos(DEADLINE_EXPIRATION_CANCEL_DELAY_NANOS);
verify(stream).cancel(statusCaptor.capture());
verify(stream, times(1)).cancel(statusCaptor.capture());
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription()).isEqualTo("context timed out");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3622,7 +3622,7 @@ public ClientTransportFactory buildClientTransportFactory() {
CallOptions.DEFAULT.withDeadlineAfter(5, TimeUnit.SECONDS));
ListenableFuture<Void> future2 = ClientCalls.futureUnaryCall(call2, null);

timer.forwardTime(5, TimeUnit.SECONDS);
timer.forwardTime(1234, TimeUnit.SECONDS);

executor.runDueTasks();
try {
Expand All @@ -3633,9 +3633,6 @@ public ClientTransportFactory buildClientTransportFactory() {
}

mychannel.shutdownNow();
// Now for Deadline_exceeded, stream shutdown is delayed, calling shutdownNow() on a open stream
// will add a task to executor. Cleaning that task here.
executor.runDueTasks();
}

@Deprecated
Expand Down

0 comments on commit 7ca6c02

Please sign in to comment.