Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ const (
compressionNegotiated // Marks if this connection has negotiated compression level with remote.
didTLSFirst // Marks if this connection requested and was accepted doing the TLS handshake first (prior to INFO).
isSlowConsumer // Marks connection as a slow consumer.
firstPong // Marks if this is the first PONG received
)

// set the flag (would be equivalent to set the boolean to true)
Expand Down Expand Up @@ -2587,6 +2588,14 @@ func (c *client) processPong() {
c.rtt = computeRTT(c.rttStart)
srv := c.srv
reorderGWs := c.kind == GATEWAY && c.gw.outbound
firstPong := c.flags.setIfNotSet(firstPong)
var ri *routeInfo
// When receiving the first PONG, for a route with pooling, we may be
// instructed to start a new route.
if firstPong && c.kind == ROUTER && c.route != nil {
ri = c.route.startNewRoute
c.route.startNewRoute = nil
}
// If compression is currently active for a route/leaf connection, if the
// compression configuration is s2_auto, check if we should change
// the compression level.
Expand All @@ -2605,6 +2614,11 @@ func (c *client) processPong() {
if reorderGWs {
srv.gateway.orderOutboundConnections()
}
if ri != nil {
srv.startGoRoutine(func() {
srv.connectToRoute(ri.url, ri.rtype, true, ri.gossipMode, _EMPTY_)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also include the case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond): from here? https://github.com/nats-io/nats-server/pull/7200/files#diff-e02d4430ca4d47be9db1b632ddb92d22c34c0919348bc020ce809e64fca8631cL2386-L2394

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed. The routes will now be more serialized than before, and even then, I am not sure this was really needed (I think I had that in case where they both connect to each other A <-> B, which could cause at the end more routes to be closed than I would have liked).

})
}
}

// Select the s2 compression level based on the client's current RTT and the configured
Expand Down
35 changes: 22 additions & 13 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ type route struct {
// Transient value used to set the Info.GossipMode when initiating
// an implicit route and sending to the remote.
gossipMode byte
// This will be set in case of pooling so that a route can trigger
// the creation of the next after receiving the first PONG, ensuring
// that authentication did not fail.
startNewRoute *routeInfo
}

// This contains the information required to create a new route.
type routeInfo struct {
url *url.URL
rtype RouteType
gossipMode byte
}

// Do not change the values/order since they are exchanged between servers.
Expand Down Expand Up @@ -2380,20 +2391,18 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMod
// Send the subscriptions interest.
s.sendSubsToRoute(c, idx, _EMPTY_)

// In pool mode, if we did not yet reach the cap, try to connect a new connection
// In pool mode, if we did not yet reach the cap, try to connect a new connection,
// but do so only after receiving the first PONG to our PING, which will ensure
// that we have proper authentication.
if pool && didSolicit && sz != effectivePoolSize {
s.startGoRoutine(func() {
select {
case <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond):
case <-s.quitCh:
// Doing this here and not as a defer because connectToRoute is also
// calling s.grWG.Done() on exit, so we do this only if we don't
// invoke connectToRoute().
s.grWG.Done()
return
}
s.connectToRoute(url, rtype, true, gossipMode, _EMPTY_)
})
c.mu.Lock()
c.route.startNewRoute = &routeInfo{
url: url,
rtype: rtype,
gossipMode: gossipMode,
}
c.sendPing()
c.mu.Unlock()
}
}
s.mu.Unlock()
Expand Down
62 changes: 60 additions & 2 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,12 @@ func checkClusterFormed(t testing.TB, servers ...*Server) {
if a == b {
continue
}
if b.getOpts().Cluster.PoolSize < 0 {
bo := b.getOpts()
if ps := bo.Cluster.PoolSize; ps < 0 {
total++
} else {
total += nr
bps := ps + len(bo.Cluster.PinnedAccounts)
total += max(nr, bps)
}
}
enr = append(enr, total)
Expand Down Expand Up @@ -3740,6 +3742,62 @@ func TestRoutePoolWithOlderServerConnectAndReconnect(t *testing.T) {
checkRepeatConnect()
}

func TestRoutePoolBadAuthNoRunawayCreateRoute(t *testing.T) {
conf1 := createConfFile(t, []byte(`
server_name: "S1"
listen: "127.0.0.1:-1"
cluster {
name: "local"
listen: "127.0.0.1:-1"
pool_size: 4
authorization {
user: "correct"
password: "correct"
timeout: 5
}
}
`))
s1, o1 := RunServerWithConfig(conf1)
defer s1.Shutdown()

l := &captureErrorLogger{errCh: make(chan string, 100)}
s1.SetLogger(l, false, false)

tmpl := `
server_name: "S2"
listen: "127.0.0.1:-1"
cluster {
name: "local"
listen: "127.0.0.1:-1"
pool_size: 5
routes: ["nats://%s@127.0.0.1:%d"]
}
`
conf2 := createConfFile(t, fmt.Appendf(nil, tmpl, "incorrect:incorrect", o1.Cluster.Port))
s2, _ := RunServerWithConfig(conf2)
defer s2.Shutdown()

deadline := time.Now().Add(2 * time.Second)
var errors int
for time.Now().Before(deadline) {
select {
case <-l.errCh:
errors++
default:
}
}
// We should not get that many errors now. In the past, we would get more
// than 200 for the 2 sec wait.
if errors > 10 {
t.Fatalf("Unexpected number of errors: %v", errors)
}

// Reload with proper credentials.
reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "correct:correct", o1.Cluster.Port))
// Ensure we can connect.
checkClusterFormed(t, s1, s2)
}

func TestRouteCompressionOptions(t *testing.T) {
org := testDefaultClusterCompression
testDefaultClusterCompression = _EMPTY_
Expand Down
Loading