2828import static io .grpc .internal .GrpcUtil .CONTENT_LENGTH_KEY ;
2929import static io .grpc .internal .GrpcUtil .MESSAGE_ACCEPT_ENCODING_KEY ;
3030import static io .grpc .internal .GrpcUtil .MESSAGE_ENCODING_KEY ;
31- import static java .lang .Math .max ;
3231
3332import com .google .common .annotations .VisibleForTesting ;
3433import com .google .common .base .MoreObjects ;
6261import java .util .concurrent .ScheduledExecutorService ;
6362import java .util .concurrent .ScheduledFuture ;
6463import java .util .concurrent .TimeUnit ;
64+ import java .util .concurrent .TimeoutException ;
6565import java .util .logging .Level ;
6666import java .util .logging .Logger ;
6767import javax .annotation .Nullable ;
@@ -82,16 +82,13 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
8282 private final boolean callExecutorIsDirect ;
8383 private final CallTracer channelCallsTracer ;
8484 private final Context context ;
85- private volatile ScheduledFuture <?> deadlineCancellationFuture ;
85+ private CancellationHandler cancellationHandler ;
8686 private final boolean unaryRequest ;
8787 private CallOptions callOptions ;
8888 private ClientStream stream ;
89- private volatile boolean cancelListenersShouldBeRemoved ;
9089 private boolean cancelCalled ;
9190 private boolean halfCloseCalled ;
9291 private final ClientStreamProvider clientStreamProvider ;
93- private final ContextCancellationListener cancellationListener =
94- new ContextCancellationListener ();
9592 private final ScheduledExecutorService deadlineCancellationExecutor ;
9693 private boolean fullStreamDecompression ;
9794 private DecompressorRegistry decompressorRegistry = DecompressorRegistry .getDefaultInstance ();
@@ -128,13 +125,6 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
128125 PerfMark .event ("ClientCall.<init>" , tag );
129126 }
130127
131- private final class ContextCancellationListener implements CancellationListener {
132- @ Override
133- public void cancelled (Context context ) {
134- stream .cancel (statusFromCancelled (context ));
135- }
136- }
137-
138128 /**
139129 * Provider of {@link ClientStream}s.
140130 */
@@ -252,21 +242,21 @@ public void runInContext() {
252242 prepareHeaders (headers , decompressorRegistry , compressor , fullStreamDecompression );
253243
254244 Deadline effectiveDeadline = effectiveDeadline ();
255- boolean deadlineExceeded = effectiveDeadline != null && effectiveDeadline .isExpired ();
245+ boolean contextIsDeadlineSource = effectiveDeadline != null
246+ && effectiveDeadline .equals (context .getDeadline ());
247+ cancellationHandler = new CancellationHandler (effectiveDeadline , contextIsDeadlineSource );
248+ boolean deadlineExceeded = effectiveDeadline != null && cancellationHandler .remainingNanos <= 0 ;
256249 if (!deadlineExceeded ) {
257- logIfContextNarrowedTimeout (
258- effectiveDeadline , context .getDeadline (), callOptions .getDeadline ());
259250 stream = clientStreamProvider .newStream (method , callOptions , headers , context );
260251 } else {
261252 ClientStreamTracer [] tracers =
262253 GrpcUtil .getClientStreamTracers (callOptions , headers , 0 , false );
263- String deadlineName =
264- isFirstMin (callOptions .getDeadline (), context .getDeadline ()) ? "CallOptions" : "Context" ;
254+ String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions" ;
265255 Long nameResolutionDelay = callOptions .getOption (NAME_RESOLUTION_DELAYED );
266256 String description = String .format (
267257 "ClientCall started after %s deadline was exceeded %.9f seconds ago. "
268258 + "Name resolution delay %.9f seconds." , deadlineName ,
269- effectiveDeadline . timeRemaining ( TimeUnit . NANOSECONDS ) / NANO_TO_SECS ,
259+ cancellationHandler . remainingNanos / NANO_TO_SECS ,
270260 nameResolutionDelay == null ? 0 : nameResolutionDelay / NANO_TO_SECS );
271261 stream = new FailingClientStream (DEADLINE_EXCEEDED .withDescription (description ), tracers );
272262 }
@@ -298,21 +288,7 @@ public void runInContext() {
298288 // they receive cancel before start. Issue #1343 has more details
299289
300290 // Propagate later Context cancellation to the remote side.
301- context .addListener (cancellationListener , directExecutor ());
302- if (effectiveDeadline != null
303- // If the context has the effective deadline, we don't need to schedule an extra task.
304- && !effectiveDeadline .equals (context .getDeadline ())
305- // If the channel has been terminated, we don't need to schedule an extra task.
306- && deadlineCancellationExecutor != null ) {
307- deadlineCancellationFuture = startDeadlineTimer (effectiveDeadline );
308- }
309- if (cancelListenersShouldBeRemoved ) {
310- // Race detected! ClientStreamListener.closed may have been called before
311- // deadlineCancellationFuture was set / context listener added, thereby preventing the future
312- // and listener from being cancelled. Go ahead and cancel again, just to be sure it
313- // was cancelled.
314- removeContextListenerAndCancelDeadlineFuture ();
315- }
291+ cancellationHandler .setUp ();
316292 }
317293
318294 private void applyMethodConfig () {
@@ -354,75 +330,96 @@ private void applyMethodConfig() {
354330 }
355331 }
356332
357- private static void logIfContextNarrowedTimeout (
358- Deadline effectiveDeadline , @ Nullable Deadline outerCallDeadline ,
359- @ Nullable Deadline callDeadline ) {
360- if (!log .isLoggable (Level .FINE ) || effectiveDeadline == null
361- || !effectiveDeadline .equals (outerCallDeadline )) {
362- return ;
333+ private final class CancellationHandler implements Runnable , CancellationListener {
334+ private final boolean contextIsDeadlineSource ;
335+ private final boolean hasDeadline ;
336+ private final long remainingNanos ;
337+ private volatile ScheduledFuture <?> deadlineCancellationFuture ;
338+ private volatile boolean tearDownCalled ;
339+
340+ CancellationHandler (Deadline deadline , boolean contextIsDeadlineSource ) {
341+ this .contextIsDeadlineSource = contextIsDeadlineSource ;
342+ if (deadline == null ) {
343+ hasDeadline = false ;
344+ remainingNanos = 0 ;
345+ } else {
346+ hasDeadline = true ;
347+ remainingNanos = deadline .timeRemaining (TimeUnit .NANOSECONDS );
348+ }
363349 }
364350
365- long effectiveTimeout = max (0 , effectiveDeadline .timeRemaining (TimeUnit .NANOSECONDS ));
366- StringBuilder builder = new StringBuilder (String .format (
367- Locale .US ,
368- "Call timeout set to '%d' ns, due to context deadline." , effectiveTimeout ));
369- if (callDeadline == null ) {
370- builder .append (" Explicit call timeout was not set." );
371- } else {
372- long callTimeout = callDeadline .timeRemaining (TimeUnit .NANOSECONDS );
373- builder .append (String .format (Locale .US , " Explicit call timeout was '%d' ns." , callTimeout ));
351+ void setUp () {
352+ if (tearDownCalled ) {
353+ return ;
354+ }
355+ if (hasDeadline
356+ // If the context has the effective deadline, we don't need to schedule an extra task.
357+ && !contextIsDeadlineSource
358+ // If the channel has been terminated, we don't need to schedule an extra task.
359+ && deadlineCancellationExecutor != null ) {
360+ deadlineCancellationFuture = deadlineCancellationExecutor .schedule (
361+ new LogExceptionRunnable (this ), remainingNanos , TimeUnit .NANOSECONDS );
362+ }
363+ context .addListener (this , directExecutor ());
364+ if (tearDownCalled ) {
365+ // Race detected! Re-run to make sure the future is cancelled and context listener removed
366+ tearDown ();
367+ }
374368 }
375369
376- log . fine ( builder . toString ());
377- }
378-
379- private void removeContextListenerAndCancelDeadlineFuture () {
380- context . removeListener ( cancellationListener );
381- ScheduledFuture <?> f = deadlineCancellationFuture ;
382- if ( f != null ) {
383- f . cancel ( false );
370+ // May be called multiple times, and race with setUp()
371+ void tearDown () {
372+ tearDownCalled = true ;
373+ ScheduledFuture <?> deadlineCancellationFuture = this . deadlineCancellationFuture ;
374+ if ( deadlineCancellationFuture != null ) {
375+ deadlineCancellationFuture . cancel ( false ) ;
376+ }
377+ context . removeListener ( this );
384378 }
385- }
386379
387- private class DeadlineTimer implements Runnable {
388- private final long remainingNanos ;
389-
390- DeadlineTimer (long remainingNanos ) {
391- this .remainingNanos = remainingNanos ;
380+ @ Override
381+ public void cancelled (Context context ) {
382+ if (hasDeadline && contextIsDeadlineSource
383+ && context .cancellationCause () instanceof TimeoutException ) {
384+ stream .cancel (formatDeadlineExceededStatus ());
385+ return ;
386+ }
387+ stream .cancel (statusFromCancelled (context ));
392388 }
393389
394390 @ Override
395391 public void run () {
396- InsightBuilder insight = new InsightBuilder ();
397- stream .appendTimeoutInsight (insight );
392+ stream .cancel (formatDeadlineExceededStatus ());
393+ }
394+
395+ Status formatDeadlineExceededStatus () {
398396 // DelayedStream.cancel() is safe to call from a thread that is different from where the
399397 // stream is created.
400398 long seconds = Math .abs (remainingNanos ) / TimeUnit .SECONDS .toNanos (1 );
401399 long nanos = Math .abs (remainingNanos ) % TimeUnit .SECONDS .toNanos (1 );
402400
403401 StringBuilder buf = new StringBuilder ();
404- buf .append ("deadline exceeded after " );
402+ buf .append (contextIsDeadlineSource ? "Context" : "CallOptions" );
403+ buf .append (" deadline exceeded after " );
405404 if (remainingNanos < 0 ) {
406405 buf .append ('-' );
407406 }
408407 buf .append (seconds );
409408 buf .append (String .format (Locale .US , ".%09d" , nanos ));
410409 buf .append ("s. " );
411410 Long nsDelay = callOptions .getOption (NAME_RESOLUTION_DELAYED );
412- buf .append (String .format (Locale .US , "Name resolution delay %.9f seconds. " ,
411+ buf .append (String .format (Locale .US , "Name resolution delay %.9f seconds." ,
413412 nsDelay == null ? 0 : nsDelay / NANO_TO_SECS ));
414- buf .append (insight );
415- stream .cancel (DEADLINE_EXCEEDED .augmentDescription (buf .toString ()));
413+ if (stream != null ) {
414+ InsightBuilder insight = new InsightBuilder ();
415+ stream .appendTimeoutInsight (insight );
416+ buf .append (" " );
417+ buf .append (insight );
418+ }
419+ return DEADLINE_EXCEEDED .withDescription (buf .toString ());
416420 }
417421 }
418422
419- private ScheduledFuture <?> startDeadlineTimer (Deadline deadline ) {
420- long remainingNanos = deadline .timeRemaining (TimeUnit .NANOSECONDS );
421- return deadlineCancellationExecutor .schedule (
422- new LogExceptionRunnable (
423- new DeadlineTimer (remainingNanos )), remainingNanos , TimeUnit .NANOSECONDS );
424- }
425-
426423 @ Nullable
427424 private Deadline effectiveDeadline () {
428425 // Call options and context are immutable, so we don't need to cache the deadline.
@@ -440,16 +437,6 @@ private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline dea
440437 return deadline0 .minimum (deadline1 );
441438 }
442439
443- private static boolean isFirstMin (@ Nullable Deadline deadline0 , @ Nullable Deadline deadline1 ) {
444- if (deadline0 == null ) {
445- return false ;
446- }
447- if (deadline1 == null ) {
448- return true ;
449- }
450- return deadline0 .isBefore (deadline1 );
451- }
452-
453440 @ Override
454441 public void request (int numMessages ) {
455442 try (TaskCloseable ignore = PerfMark .traceTask ("ClientCall.request" )) {
@@ -493,7 +480,10 @@ private void cancelInternal(@Nullable String message, @Nullable Throwable cause)
493480 stream .cancel (status );
494481 }
495482 } finally {
496- removeContextListenerAndCancelDeadlineFuture ();
483+ // start() might not have been called
484+ if (cancellationHandler != null ) {
485+ cancellationHandler .tearDown ();
486+ }
497487 }
498488 }
499489
@@ -699,10 +689,7 @@ private void closedInternal(
699689 // description. Since our timer may be delayed in firing, we double-check the deadline and
700690 // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
701691 if (deadline .isExpired ()) {
702- InsightBuilder insight = new InsightBuilder ();
703- stream .appendTimeoutInsight (insight );
704- status = DEADLINE_EXCEEDED .augmentDescription (
705- "ClientCall was cancelled at or after deadline. " + insight );
692+ status = cancellationHandler .formatDeadlineExceededStatus ();
706693 // Replace trailers to prevent mixing sources of status and trailers.
707694 trailers = new Metadata ();
708695 }
@@ -725,6 +712,7 @@ public void runInContext() {
725712 }
726713
727714 private void runInternal () {
715+ cancellationHandler .tearDown ();
728716 Status status = savedStatus ;
729717 Metadata trailers = savedTrailers ;
730718 if (exceptionStatus != null ) {
@@ -737,11 +725,9 @@ private void runInternal() {
737725 // Replace trailers to prevent mixing sources of status and trailers.
738726 trailers = new Metadata ();
739727 }
740- cancelListenersShouldBeRemoved = true ;
741728 try {
742729 closeObserver (observer , status , trailers );
743730 } finally {
744- removeContextListenerAndCancelDeadlineFuture ();
745731 channelCallsTracer .reportCallEnded (status .isOk ());
746732 }
747733 }
0 commit comments