Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate read/write network timeouts #161

Merged
merged 9 commits into from
May 11, 2018
101 changes: 38 additions & 63 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,26 @@ import (

type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
minPoolSize int
maxIdleTimeMS int
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
syncCount uint
cachedIndex map[string]bool
sync chan bool
dial dialer
dialInfo *DialInfo
}

func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
func newCluster(userSeeds []string, info *DialInfo) *mongoCluster {
cluster := &mongoCluster{
userSeeds: userSeeds,
references: 1,
direct: direct,
failFast: failFast,
dial: dial,
setName: setName,
appName: appName,
dial: dialer{info.Dial, info.DialServer},
dialInfo: info,
}
cluster.serverSynced.L = cluster.RWMutex.RLocker()
cluster.sync = make(chan bool, 1)
Expand Down Expand Up @@ -147,7 +139,7 @@ type isMasterResult struct {

func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
// Monotonic let's it talk to a slave and still hold the socket.
session := newSession(Monotonic, cluster, 10*time.Second)
session := newSession(Monotonic, cluster, cluster.dialInfo)
session.setSocket(socket)

var cmd = bson.D{{Name: "isMaster", Value: 1}}
Expand All @@ -171,8 +163,8 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul
}

// Include the application name if set
if cluster.appName != "" {
meta["application"] = bson.M{"name": cluster.appName}
if cluster.dialInfo.AppName != "" {
meta["application"] = bson.M{"name": cluster.dialInfo.AppName}
}

cmd = append(cmd, bson.DocElem{
Expand All @@ -190,27 +182,15 @@ type possibleTimeout interface {
Timeout() bool
}

var syncSocketTimeout = 5 * time.Second

func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerInfo, hosts []string, err error) {
var syncTimeout time.Duration
if raceDetector {
// This variable is only ever touched by tests.
globalMutex.Lock()
syncTimeout = syncSocketTimeout
globalMutex.Unlock()
} else {
syncTimeout = syncSocketTimeout
}

addr := server.Addr
log("SYNC Processing ", addr, "...")

// Retry a few times to avoid knocking a server down for a hiccup.
var result isMasterResult
var tryerr error
for retry := 0; ; retry++ {
if retry == 3 || retry == 1 && cluster.failFast {
if retry == 3 || retry == 1 && cluster.dialInfo.FailFast {
return nil, nil, tryerr
}
if retry > 0 {
Expand All @@ -222,16 +202,22 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
time.Sleep(syncShortDelay)
}

// It's not clear what would be a good timeout here. Is it
// better to wait longer or to retry?
socket, _, err := server.AcquireSocket(0, syncTimeout)
// Don't ever hit the pool limit for syncing
config := cluster.dialInfo.Copy()
config.PoolLimit = 0

socket, _, err := server.AcquireSocket(config)
if err != nil {
tryerr = err
logf("SYNC Failed to get socket to %s: %v", addr, err)
continue
}
err = cluster.isMaster(socket, &result)

// Restore the correct dial config before returning it to the pool
socket.dialInfo = cluster.dialInfo
socket.Release()

if err != nil {
tryerr = err
logf("SYNC Command 'ismaster' to %s failed: %v", addr, err)
Expand All @@ -241,9 +227,9 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
break
}

if cluster.setName != "" && result.SetName != cluster.setName {
logf("SYNC Server %s is not a member of replica set %q", addr, cluster.setName)
return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.setName)
if cluster.dialInfo.ReplicaSetName != "" && result.SetName != cluster.dialInfo.ReplicaSetName {
logf("SYNC Server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName)
return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName)
}

if result.IsMaster {
Expand All @@ -255,7 +241,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
}
} else if result.Secondary {
debugf("SYNC %s is a slave.", addr)
} else if cluster.direct {
} else if cluster.dialInfo.Direct {
logf("SYNC %s in unknown state. Pretending it's a slave due to direct connection.", addr)
} else {
logf("SYNC %s is neither a master nor a slave.", addr)
Expand Down Expand Up @@ -386,7 +372,7 @@ func (cluster *mongoCluster) syncServersLoop() {
break
}
cluster.references++ // Keep alive while syncing.
direct := cluster.direct
direct := cluster.dialInfo.Direct
Copy link

@szank szank May 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster.dialInfo is a pointer, so it being inside the cluster lock does not make sense. ( I might be missing something ), but I think it would be more intuitive to move this assignment out of the critical section.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've kept the locking semantics from the existing code, it was confusing enough tracing the timeouts around the codebase without taking into account the locking choices - the locking needs a review in general.

cluster.Unlock()

cluster.syncServersIteration(direct)
Expand All @@ -401,7 +387,7 @@ func (cluster *mongoCluster) syncServersLoop() {

// Hold off before allowing another sync. No point in
// burning CPU looking for down servers.
if !cluster.failFast {
if !cluster.dialInfo.FailFast {
time.Sleep(syncShortDelay)
}

Expand Down Expand Up @@ -439,13 +425,11 @@ func (cluster *mongoCluster) syncServersLoop() {
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
cluster.RLock()
server := cluster.servers.Search(tcpaddr.String())
minPoolSize := cluster.minPoolSize
maxIdleTimeMS := cluster.maxIdleTimeMS
cluster.RUnlock()
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.dialInfo)
}

func resolveAddr(addr string) (*net.TCPAddr, error) {
Expand Down Expand Up @@ -614,19 +598,10 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
cluster.Unlock()
}

// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0)
}

// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
) (s *mongoSocket, err error) {
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk bool, syncTimeout time.Duration, serverTags []bson.D, info *DialInfo) (s *mongoSocket, err error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can syncTimeout be moved to DialInfo as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes more of my life than I am willing to give.

var started time.Time
var syncCount uint
for {
Expand All @@ -645,7 +620,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
// Initialize after fast path above.
started = time.Now()
syncCount = cluster.syncCount
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.failFast && cluster.syncCount != syncCount {
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.dialInfo.FailFast && cluster.syncCount != syncCount {
cluster.RUnlock()
return nil, errors.New("no reachable servers")
}
Expand All @@ -670,7 +645,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
continue
}

s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
s, abended, err := server.AcquireSocketWithBlocking(info)
if err == errPoolTimeout {
// No need to remove servers from the topology if acquiring a socket fails for this reason.
return nil, err
Expand Down
2 changes: 0 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,8 +1055,6 @@ func (s *S) TestSocketTimeoutOnDial(c *C) {

timeout := 1 * time.Second

defer mgo.HackSyncSocketTimeout(timeout)()

s.Freeze("localhost:40001")

started := time.Now()
Expand Down
14 changes: 0 additions & 14 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,6 @@ func HackPingDelay(newDelay time.Duration) (restore func()) {
return
}

func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) {
globalMutex.Lock()
defer globalMutex.Unlock()

oldTimeout := syncSocketTimeout
restore = func() {
globalMutex.Lock()
syncSocketTimeout = oldTimeout
globalMutex.Unlock()
}
syncSocketTimeout = newTimeout
return
}

func (s *Session) Cluster() *mongoCluster {
return s.cluster()
}
Expand Down
Loading