Skip to content

Commit 1ddab33

Browse files
authored
client: fix detection of whether IO was performed in NewStream (#4611)
For transparent retry. Also allow non-WFR RPCs to retry indefinitely on errors that resulted in no I/O; the spec used to forbid it at one point during development, but it no longer does.
1 parent 582ef45 commit 1ddab33

File tree

4 files changed

+70
-44
lines changed

4 files changed

+70
-44
lines changed

internal/transport/http2_client.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -616,26 +616,39 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
616616
return callAuthData, nil
617617
}
618618

619-
// PerformedIOError wraps an error to indicate IO may have been performed
620-
// before the error occurred.
621-
type PerformedIOError struct {
619+
// NewStreamError wraps an error and reports additional information.
620+
type NewStreamError struct {
622621
Err error
622+
623+
DoNotRetry bool
624+
PerformedIO bool
623625
}
624626

625-
// Error implements error.
626-
func (p PerformedIOError) Error() string {
627-
return p.Err.Error()
627+
func (e NewStreamError) Error() string {
628+
return e.Err.Error()
628629
}
629630

630631
// NewStream creates a stream and registers it into the transport as "active"
631-
// streams.
632+
// streams. All non-nil errors returned will be *NewStreamError.
632633
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
634+
defer func() {
635+
if err != nil {
636+
nse, ok := err.(*NewStreamError)
637+
if !ok {
638+
nse = &NewStreamError{Err: err}
639+
}
640+
if len(t.perRPCCreds) > 0 || callHdr.Creds != nil {
641+
// We may have performed I/O in the per-RPC creds callback, so do not
642+
// allow transparent retry.
643+
nse.PerformedIO = true
644+
}
645+
err = nse
646+
}
647+
}()
633648
ctx = peer.NewContext(ctx, t.getPeer())
634649
headerFields, err := t.createHeaderFields(ctx, callHdr)
635650
if err != nil {
636-
// We may have performed I/O in the per-RPC creds callback, so do not
637-
// allow transparent retry.
638-
return nil, PerformedIOError{err}
651+
return nil, err
639652
}
640653
s := t.newStream(ctx, callHdr)
641654
cleanup := func(err error) {
@@ -741,7 +754,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
741754
break
742755
}
743756
if hdrListSizeErr != nil {
744-
return nil, hdrListSizeErr
757+
return nil, &NewStreamError{Err: hdrListSizeErr, DoNotRetry: true}
745758
}
746759
firstTry = false
747760
select {

internal/transport/transport_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ func (s) TestGracefulClose(t *testing.T) {
780780
go func() {
781781
defer wg.Done()
782782
str, err := ct.NewStream(ctx, &CallHdr{})
783-
if err == ErrConnClosing {
783+
if err != nil && err.(*NewStreamError).Err == ErrConnClosing {
784784
return
785785
} else if err != nil {
786786
t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing)

rpc_util.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -829,26 +829,28 @@ func Errorf(c codes.Code, format string, a ...interface{}) error {
829829

830830
// toRPCErr converts an error into an error from the status package.
831831
func toRPCErr(err error) error {
832-
if err == nil || err == io.EOF {
832+
switch err {
833+
case nil, io.EOF:
833834
return err
834-
}
835-
if err == io.ErrUnexpectedEOF {
835+
case context.DeadlineExceeded:
836+
return status.Error(codes.DeadlineExceeded, err.Error())
837+
case context.Canceled:
838+
return status.Error(codes.Canceled, err.Error())
839+
case io.ErrUnexpectedEOF:
836840
return status.Error(codes.Internal, err.Error())
837841
}
838-
if _, ok := status.FromError(err); ok {
839-
return err
840-
}
842+
841843
switch e := err.(type) {
842844
case transport.ConnectionError:
843845
return status.Error(codes.Unavailable, e.Desc)
844-
default:
845-
switch err {
846-
case context.DeadlineExceeded:
847-
return status.Error(codes.DeadlineExceeded, err.Error())
848-
case context.Canceled:
849-
return status.Error(codes.Canceled, err.Error())
850-
}
846+
case *transport.NewStreamError:
847+
return toRPCErr(e.Err)
848+
}
849+
850+
if _, ok := status.FromError(err); ok {
851+
return err
851852
}
853+
852854
return status.Error(codes.Unknown, err.Error())
853855
}
854856

stream.go

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -421,12 +421,9 @@ func (a *csAttempt) newStream() error {
421421
cs.callHdr.PreviousAttempts = cs.numRetries
422422
s, err := a.t.NewStream(cs.ctx, cs.callHdr)
423423
if err != nil {
424-
if _, ok := err.(transport.PerformedIOError); ok {
425-
// Return without converting to an RPC error so retry code can
426-
// inspect.
427-
return err
428-
}
429-
return toRPCErr(err)
424+
// Return without converting to an RPC error so retry code can
425+
// inspect.
426+
return err
430427
}
431428
cs.attempt.s = s
432429
cs.attempt.p = &parser{r: s}
@@ -525,19 +522,28 @@ func (cs *clientStream) commitAttempt() {
525522
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
526523
// the error that should be returned by the operation.
527524
func (cs *clientStream) shouldRetry(err error) error {
528-
unprocessed := false
529525
if cs.attempt.s == nil {
530-
pioErr, ok := err.(transport.PerformedIOError)
531-
if ok {
532-
// Unwrap error.
533-
err = toRPCErr(pioErr.Err)
534-
} else {
535-
unprocessed = true
526+
// Error from NewClientStream.
527+
nse, ok := err.(*transport.NewStreamError)
528+
if !ok {
529+
// Unexpected, but assume no I/O was performed and the RPC is not
530+
// fatal, so retry indefinitely.
531+
return nil
536532
}
537-
if !ok && !cs.callInfo.failFast {
538-
// In the event of a non-IO operation error from NewStream, we
539-
// never attempted to write anything to the wire, so we can retry
540-
// indefinitely for non-fail-fast RPCs.
533+
534+
// Unwrap and convert error.
535+
err = toRPCErr(nse.Err)
536+
537+
// Never retry DoNotRetry errors, which indicate the RPC should not be
538+
// retried due to max header list size violation, etc.
539+
if nse.DoNotRetry {
540+
return err
541+
}
542+
543+
// In the event of a non-IO operation error from NewStream, we never
544+
// attempted to write anything to the wire, so we can retry
545+
// indefinitely.
546+
if !nse.PerformedIO {
541547
return nil
542548
}
543549
}
@@ -546,6 +552,7 @@ func (cs *clientStream) shouldRetry(err error) error {
546552
return err
547553
}
548554
// Wait for the trailers.
555+
unprocessed := false
549556
if cs.attempt.s != nil {
550557
<-cs.attempt.s.Done()
551558
unprocessed = cs.attempt.s.Unprocessed()
@@ -634,7 +641,7 @@ func (cs *clientStream) shouldRetry(err error) error {
634641
// Returns nil if a retry was performed and succeeded; error otherwise.
635642
func (cs *clientStream) retryLocked(lastErr error) error {
636643
for {
637-
cs.attempt.finish(lastErr)
644+
cs.attempt.finish(toRPCErr(lastErr))
638645
if err := cs.shouldRetry(lastErr); err != nil {
639646
cs.commitAttemptLocked()
640647
return err
@@ -661,7 +668,11 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
661668
for {
662669
if cs.committed {
663670
cs.mu.Unlock()
664-
return op(cs.attempt)
671+
// toRPCErr is used in case the error from the attempt comes from
672+
// NewClientStream, which intentionally doesn't return a status
673+
// error to allow for further inspection; all other errors should
674+
// already be status errors.
675+
return toRPCErr(op(cs.attempt))
665676
}
666677
a := cs.attempt
667678
cs.mu.Unlock()

0 commit comments

Comments
 (0)