Skip to content

Commit 9773b38

Browse files
committed
transport: remove RequireHandshakeHybrid support
This removes RequireHandshakeHybrid support and changes the default behavior to RequireHandshakeOn. Dial calls will now block and wait for a successful handshake before proceeding. Users relying on the old hybrid behavior (cmux users) should consult soheilhy/cmux#64. Also, several tests have been updated to take this into consideration by sending settings frames.
1 parent 32559e2 commit 9773b38

File tree

4 files changed

+160
-228
lines changed

4 files changed

+160
-228
lines changed

Diff for: clientconn.go

+52-106
Original file line numberDiff line numberDiff line change
@@ -1020,35 +1020,8 @@ func (ac *addrConn) resetTransport() {
10201020
reconnect := grpcsync.NewEvent()
10211021
prefaceReceived := make(chan struct{})
10221022
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1023-
if err == nil {
1024-
ac.mu.Lock()
1025-
ac.curAddr = addr
1026-
ac.transport = newTr
1027-
ac.mu.Unlock()
1028-
1029-
healthCheckConfig := ac.cc.healthCheckConfig()
1030-
// LB channel health checking is only enabled when all the four requirements below are met:
1031-
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1032-
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1033-
// 3. a service config with non-empty healthCheckConfig field is provided,
1034-
// 4. the current load balancer allows it.
1035-
healthcheckManagingState := false
1036-
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1037-
if ac.cc.dopts.healthCheckFunc == nil {
1038-
// TODO: add a link to the health check doc in the error message.
1039-
grpclog.Error("the client side LB channel health check function has not been set.")
1040-
} else {
1041-
// TODO(deklerk) refactor to just return transport
1042-
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1043-
healthcheckManagingState = true
1044-
}
1045-
}
1046-
if !healthcheckManagingState {
1047-
ac.mu.Lock()
1048-
ac.updateConnectivityState(connectivity.Ready)
1049-
ac.mu.Unlock()
1050-
}
1051-
} else {
1023+
if err != nil {
1024+
ac.cc.blockingpicker.updateConnectionError(err)
10521025
hcancel()
10531026
if err == errConnClosing {
10541027
return
@@ -1061,55 +1034,46 @@ func (ac *addrConn) resetTransport() {
10611034
}
10621035

10631036
ac.mu.Lock()
1064-
reqHandshake := ac.dopts.reqHandshake
1037+
ac.curAddr = addr
1038+
ac.transport = newTr
10651039
ac.mu.Unlock()
10661040

1067-
<-reconnect.Done()
1068-
hcancel()
1069-
1070-
if reqHandshake == envconfig.RequireHandshakeHybrid {
1071-
// In RequireHandshakeHybrid mode, we must check to see whether
1072-
// server preface has arrived yet to decide whether to start
1073-
// reconnecting at the top of the list (server preface received)
1074-
// or continue with the next addr in the list as if the
1075-
// connection were not successful (server preface not received).
1076-
select {
1077-
case <-prefaceReceived:
1078-
// We received a server preface - huzzah! We consider this
1079-
// a success and restart from the top of the addr list.
1080-
ac.mu.Lock()
1081-
ac.backoffIdx = 0
1082-
ac.mu.Unlock()
1083-
break addrLoop
1084-
default:
1085-
// Despite having set state to READY, in hybrid mode we
1086-
// consider this a failure and continue connecting at the
1087-
// next addr in the list.
1088-
ac.mu.Lock()
1089-
if ac.state == connectivity.Shutdown {
1090-
ac.mu.Unlock()
1091-
return
1092-
}
1093-
1094-
ac.updateConnectivityState(connectivity.TransientFailure)
1095-
ac.mu.Unlock()
1096-
1097-
if tryNextAddrFromStart.HasFired() {
1098-
break addrLoop
1099-
}
1041+
healthCheckConfig := ac.cc.healthCheckConfig()
1042+
// LB channel health checking is only enabled when all the four requirements below are met:
1043+
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1044+
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1045+
// 3. a service config with non-empty healthCheckConfig field is provided,
1046+
// 4. the current load balancer allows it.
1047+
healthcheckManagingState := false
1048+
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1049+
if ac.cc.dopts.healthCheckFunc == nil {
1050+
// TODO: add a link to the health check doc in the error message.
1051+
grpclog.Error("the client side LB channel health check function has not been set.")
1052+
} else {
1053+
// TODO(deklerk) refactor to just return transport
1054+
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1055+
healthcheckManagingState = true
11001056
}
1101-
} else {
1102-
// In RequireHandshakeOn mode, we would have already waited for
1103-
// the server preface, so we consider this a success and restart
1104-
// from the top of the addr list. In RequireHandshakeOff mode,
1105-
// we don't care to wait for the server preface before
1106-
// considering this a success, so we also restart from the top
1107-
// of the addr list.
1057+
}
1058+
if !healthcheckManagingState {
11081059
ac.mu.Lock()
1109-
ac.backoffIdx = 0
1060+
ac.updateConnectivityState(connectivity.Ready)
11101061
ac.mu.Unlock()
1111-
break addrLoop
11121062
}
1063+
1064+
<-reconnect.Done()
1065+
hcancel()
1066+
1067+
// In RequireHandshakeOn mode, we would have already waited for
1068+
// the server preface, so we consider this a success and restart
1069+
// from the top of the addr list. In RequireHandshakeOff mode,
1070+
// we don't care to wait for the server preface before
1071+
// considering this a success, so we also restart from the top
1072+
// of the addr list.
1073+
ac.mu.Lock()
1074+
ac.backoffIdx = 0
1075+
ac.mu.Unlock()
1076+
break addrLoop
11131077
}
11141078

11151079
// After exhausting all addresses, or after need to reconnect after a
@@ -1155,8 +1119,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
11551119
Authority: ac.cc.authority,
11561120
}
11571121

1158-
prefaceTimer := time.NewTimer(time.Until(connectDeadline))
1159-
11601122
onGoAway := func(r transport.GoAwayReason) {
11611123
ac.mu.Lock()
11621124
ac.adjustParams(r)
@@ -1166,13 +1128,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
11661128

11671129
onClose := func() {
11681130
close(onCloseCalled)
1169-
prefaceTimer.Stop()
11701131
reconnect.Fire()
11711132
}
11721133

11731134
onPrefaceReceipt := func() {
11741135
close(prefaceReceived)
1175-
prefaceTimer.Stop()
11761136
}
11771137

11781138
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@@ -1182,38 +1142,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
11821142
}
11831143

11841144
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1185-
1186-
if err == nil {
1187-
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1188-
select {
1189-
case <-prefaceTimer.C:
1190-
// We didn't get the preface in time.
1191-
newTr.Close()
1192-
err = errors.New("timed out waiting for server handshake")
1193-
case <-prefaceReceived:
1194-
// We got the preface - huzzah! things are good.
1195-
case <-onCloseCalled:
1196-
// The transport has already closed - noop.
1197-
return nil, errors.New("connection closed")
1198-
}
1199-
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1200-
go func() {
1201-
select {
1202-
case <-prefaceTimer.C:
1203-
// We didn't get the preface in time.
1204-
newTr.Close()
1205-
case <-prefaceReceived:
1206-
// We got the preface just in the nick of time - huzzah!
1207-
case <-onCloseCalled:
1208-
// The transport has already closed - noop.
1209-
}
1210-
}()
1211-
}
1212-
}
1213-
12141145
if err != nil {
12151146
// newTr is either nil, or closed.
1216-
ac.cc.blockingpicker.updateConnectionError(err)
12171147
ac.mu.Lock()
12181148
if ac.state == connectivity.Shutdown {
12191149
// ac.tearDown(...) has been invoked.
@@ -1226,6 +1156,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
12261156
return nil, err
12271157
}
12281158

1159+
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1160+
select {
1161+
case <-time.After(connectDeadline.Sub(time.Now())):
1162+
// We didn't get the preface in time.
1163+
newTr.Close()
1164+
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1165+
return nil, errors.New("timed out waiting for server handshake")
1166+
case <-prefaceReceived:
1167+
// We got the preface - huzzah! things are good.
1168+
case <-onCloseCalled:
1169+
// The transport has already closed - noop.
1170+
return nil, errors.New("connection closed")
1171+
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1172+
}
1173+
}
1174+
12291175
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
12301176
ac.mu.Lock()
12311177
if ac.state == connectivity.Shutdown {

0 commit comments

Comments
 (0)