Skip to content

Commit a195f7c

Browse files
committed
transport: remove RequireHandshakeHybrid support
This removes RequireHandshakeHybrid support and changes the default behavior to RequireHandshakeOn. Dial calls will now block and wait for a successful handshake before proceeding. Users relying on the old hybrid behavior (cmux users) should consult soheilhy/cmux#64. Also, several tests have been updated to take this into consideration by sending settings frames.
1 parent 2259ee6 commit a195f7c

File tree

4 files changed

+161
-224
lines changed

4 files changed

+161
-224
lines changed

Diff for: clientconn.go

+52-102
Original file line numberDiff line numberDiff line change
@@ -1022,35 +1022,8 @@ func (ac *addrConn) resetTransport() {
10221022
reconnect := grpcsync.NewEvent()
10231023
prefaceReceived := make(chan struct{})
10241024
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1025-
if err == nil {
1026-
ac.mu.Lock()
1027-
ac.curAddr = addr
1028-
ac.transport = newTr
1029-
ac.mu.Unlock()
1030-
1031-
healthCheckConfig := ac.cc.healthCheckConfig()
1032-
// LB channel health checking is only enabled when all the four requirements below are met:
1033-
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1034-
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1035-
// 3. a service config with non-empty healthCheckConfig field is provided,
1036-
// 4. the current load balancer allows it.
1037-
healthcheckManagingState := false
1038-
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1039-
if ac.cc.dopts.healthCheckFunc == nil {
1040-
// TODO: add a link to the health check doc in the error message.
1041-
grpclog.Error("the client side LB channel health check function has not been set.")
1042-
} else {
1043-
// TODO(deklerk) refactor to just return transport
1044-
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1045-
healthcheckManagingState = true
1046-
}
1047-
}
1048-
if !healthcheckManagingState {
1049-
ac.mu.Lock()
1050-
ac.updateConnectivityState(connectivity.Ready)
1051-
ac.mu.Unlock()
1052-
}
1053-
} else {
1025+
if err != nil {
1026+
ac.cc.blockingpicker.updateConnectionError(err)
10541027
hcancel()
10551028
if err == errConnClosing {
10561029
return
@@ -1063,55 +1036,46 @@ func (ac *addrConn) resetTransport() {
10631036
}
10641037

10651038
ac.mu.Lock()
1066-
reqHandshake := ac.dopts.reqHandshake
1039+
ac.curAddr = addr
1040+
ac.transport = newTr
10671041
ac.mu.Unlock()
10681042

1069-
<-reconnect.Done()
1070-
hcancel()
1071-
1072-
if reqHandshake == envconfig.RequireHandshakeHybrid {
1073-
// In RequireHandshakeHybrid mode, we must check to see whether
1074-
// server preface has arrived yet to decide whether to start
1075-
// reconnecting at the top of the list (server preface received)
1076-
// or continue with the next addr in the list as if the
1077-
// connection were not successful (server preface not received).
1078-
select {
1079-
case <-prefaceReceived:
1080-
// We received a server preface - huzzah! We consider this
1081-
// a success and restart from the top of the addr list.
1082-
ac.mu.Lock()
1083-
ac.backoffIdx = 0
1084-
ac.mu.Unlock()
1085-
break addrLoop
1086-
default:
1087-
// Despite having set state to READY, in hybrid mode we
1088-
// consider this a failure and continue connecting at the
1089-
// next addr in the list.
1090-
ac.mu.Lock()
1091-
if ac.state == connectivity.Shutdown {
1092-
ac.mu.Unlock()
1093-
return
1094-
}
1095-
1096-
ac.updateConnectivityState(connectivity.TransientFailure)
1097-
ac.mu.Unlock()
1098-
1099-
if tryNextAddrFromStart.HasFired() {
1100-
break addrLoop
1101-
}
1043+
healthCheckConfig := ac.cc.healthCheckConfig()
1044+
// LB channel health checking is only enabled when all the four requirements below are met:
1045+
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1046+
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1047+
// 3. a service config with non-empty healthCheckConfig field is provided,
1048+
// 4. the current load balancer allows it.
1049+
healthcheckManagingState := false
1050+
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1051+
if ac.cc.dopts.healthCheckFunc == nil {
1052+
// TODO: add a link to the health check doc in the error message.
1053+
grpclog.Error("the client side LB channel health check function has not been set.")
1054+
} else {
1055+
// TODO(deklerk) refactor to just return transport
1056+
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1057+
healthcheckManagingState = true
11021058
}
1103-
} else {
1104-
// In RequireHandshakeOn mode, we would have already waited for
1105-
// the server preface, so we consider this a success and restart
1106-
// from the top of the addr list. In RequireHandshakeOff mode,
1107-
// we don't care to wait for the server preface before
1108-
// considering this a success, so we also restart from the top
1109-
// of the addr list.
1059+
}
1060+
if !healthcheckManagingState {
11101061
ac.mu.Lock()
1111-
ac.backoffIdx = 0
1062+
ac.updateConnectivityState(connectivity.Ready)
11121063
ac.mu.Unlock()
1113-
break addrLoop
11141064
}
1065+
1066+
<-reconnect.Done()
1067+
hcancel()
1068+
1069+
// In RequireHandshakeOn mode, we would have already waited for
1070+
// the server preface, so we consider this a success and restart
1071+
// from the top of the addr list. In RequireHandshakeOff mode,
1072+
// we don't care to wait for the server preface before
1073+
// considering this a success, so we also restart from the top
1074+
// of the addr list.
1075+
ac.mu.Lock()
1076+
ac.backoffIdx = 0
1077+
ac.mu.Unlock()
1078+
break addrLoop
11151079
}
11161080

11171081
// After exhausting all addresses, or after need to reconnect after a
@@ -1184,38 +1148,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
11841148
}
11851149

11861150
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1187-
1188-
if err == nil {
1189-
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1190-
select {
1191-
case <-prefaceTimer.C:
1192-
// We didn't get the preface in time.
1193-
newTr.Close()
1194-
err = errors.New("timed out waiting for server handshake")
1195-
case <-prefaceReceived:
1196-
// We got the preface - huzzah! things are good.
1197-
case <-onCloseCalled:
1198-
// The transport has already closed - noop.
1199-
return nil, errors.New("connection closed")
1200-
}
1201-
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1202-
go func() {
1203-
select {
1204-
case <-prefaceTimer.C:
1205-
// We didn't get the preface in time.
1206-
newTr.Close()
1207-
case <-prefaceReceived:
1208-
// We got the preface just in the nick of time - huzzah!
1209-
case <-onCloseCalled:
1210-
// The transport has already closed - noop.
1211-
}
1212-
}()
1213-
}
1214-
}
1215-
12161151
if err != nil {
12171152
// newTr is either nil, or closed.
1218-
ac.cc.blockingpicker.updateConnectionError(err)
12191153
ac.mu.Lock()
12201154
if ac.state == connectivity.Shutdown {
12211155
// ac.tearDown(...) has been invoked.
@@ -1228,6 +1162,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
12281162
return nil, err
12291163
}
12301164

1165+
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1166+
select {
1167+
case <-prefaceTimer.C:
1168+
// We didn't get the preface in time.
1169+
newTr.Close()
1170+
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
1171+
return nil, errors.New("timed out waiting for server handshake")
1172+
case <-prefaceReceived:
1173+
// We got the preface - huzzah! things are good.
1174+
case <-onCloseCalled:
1175+
// The transport has already closed - noop.
1176+
return nil, errors.New("connection closed")
1177+
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
1178+
}
1179+
}
1180+
12311181
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
12321182
ac.mu.Lock()
12331183
if ac.state == connectivity.Shutdown {

0 commit comments

Comments
 (0)