Skip to content

Commit

Permalink
Separate read/write network timeouts (#161)
Browse files Browse the repository at this point in the history
* socket: separate read/write network timeouts

Splits DialInfo.Timeout (defaults to 60s when using mgo.Dial()) into ReadTimeout
and WriteTimeout to address #160. Read/write timeout defaults to
DialInfo.Timeout to preserve existing behaviour.

* cluster: remove AcquireSocket

Only used by tests, replaced by the pool-aware acquire socket functions:
	* AcquireSocketWithPoolTimeout
	* AcquireSocketWithBlocking

* cluster: use configured timeouts for cluster operations

* `mongoCluster.syncServer()` no longer uses hard-coded 5 seconds
* `mongoCluster.isMaster()` no longer uses hard-coded 10 seconds

* tests: use DialInfo for internal timeouts

* server: fix fantastic serverTags nil slice bug

When unmarshalling serverTags, it is now an empty slice, instead of a nil slice.

`len(thing) == 0` works all the time, regardless.

* cluster: remove unused duplicate pool config

* session: avoid calculating default values in hot path

Changes `DialWithInfo` to handle setting default values by setting the relevant
`DialInfo` field, rather than calling the respective methods in the hot path for:

	* `PoolLimit`
	* `ReadTimeout`
	* `WriteTimeout`

* session: remove unused consts

* session: update docs
  • Loading branch information
domodwyer authored May 11, 2018
1 parent 45151e7 commit 72d0ac2
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 179 deletions.
101 changes: 38 additions & 63 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,26 @@ import (

type mongoCluster struct {
sync.RWMutex
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
direct bool
failFast bool
syncCount uint
setName string
cachedIndex map[string]bool
sync chan bool
dial dialer
appName string
minPoolSize int
maxIdleTimeMS int
serverSynced sync.Cond
userSeeds []string
dynaSeeds []string
servers mongoServers
masters mongoServers
references int
syncing bool
syncCount uint
cachedIndex map[string]bool
sync chan bool
dial dialer
dialInfo *DialInfo
}

func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster {
func newCluster(userSeeds []string, info *DialInfo) *mongoCluster {
cluster := &mongoCluster{
userSeeds: userSeeds,
references: 1,
direct: direct,
failFast: failFast,
dial: dial,
setName: setName,
appName: appName,
dial: dialer{info.Dial, info.DialServer},
dialInfo: info,
}
cluster.serverSynced.L = cluster.RWMutex.RLocker()
cluster.sync = make(chan bool, 1)
Expand Down Expand Up @@ -147,7 +139,7 @@ type isMasterResult struct {

func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
// Monotonic let's it talk to a slave and still hold the socket.
session := newSession(Monotonic, cluster, 10*time.Second)
session := newSession(Monotonic, cluster, cluster.dialInfo)
session.setSocket(socket)

var cmd = bson.D{{Name: "isMaster", Value: 1}}
Expand All @@ -171,8 +163,8 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul
}

// Include the application name if set
if cluster.appName != "" {
meta["application"] = bson.M{"name": cluster.appName}
if cluster.dialInfo.AppName != "" {
meta["application"] = bson.M{"name": cluster.dialInfo.AppName}
}

cmd = append(cmd, bson.DocElem{
Expand All @@ -190,27 +182,15 @@ type possibleTimeout interface {
Timeout() bool
}

var syncSocketTimeout = 5 * time.Second

func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerInfo, hosts []string, err error) {
var syncTimeout time.Duration
if raceDetector {
// This variable is only ever touched by tests.
globalMutex.Lock()
syncTimeout = syncSocketTimeout
globalMutex.Unlock()
} else {
syncTimeout = syncSocketTimeout
}

addr := server.Addr
log("SYNC Processing ", addr, "...")

// Retry a few times to avoid knocking a server down for a hiccup.
var result isMasterResult
var tryerr error
for retry := 0; ; retry++ {
if retry == 3 || retry == 1 && cluster.failFast {
if retry == 3 || retry == 1 && cluster.dialInfo.FailFast {
return nil, nil, tryerr
}
if retry > 0 {
Expand All @@ -222,16 +202,22 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
time.Sleep(syncShortDelay)
}

// It's not clear what would be a good timeout here. Is it
// better to wait longer or to retry?
socket, _, err := server.AcquireSocket(0, syncTimeout)
// Don't ever hit the pool limit for syncing
config := cluster.dialInfo.Copy()
config.PoolLimit = 0

socket, _, err := server.AcquireSocket(config)
if err != nil {
tryerr = err
logf("SYNC Failed to get socket to %s: %v", addr, err)
continue
}
err = cluster.isMaster(socket, &result)

// Restore the correct dial config before returning it to the pool
socket.dialInfo = cluster.dialInfo
socket.Release()

if err != nil {
tryerr = err
logf("SYNC Command 'ismaster' to %s failed: %v", addr, err)
Expand All @@ -241,9 +227,9 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
break
}

if cluster.setName != "" && result.SetName != cluster.setName {
logf("SYNC Server %s is not a member of replica set %q", addr, cluster.setName)
return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.setName)
if cluster.dialInfo.ReplicaSetName != "" && result.SetName != cluster.dialInfo.ReplicaSetName {
logf("SYNC Server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName)
return nil, nil, fmt.Errorf("server %s is not a member of replica set %q", addr, cluster.dialInfo.ReplicaSetName)
}

if result.IsMaster {
Expand All @@ -255,7 +241,7 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerI
}
} else if result.Secondary {
debugf("SYNC %s is a slave.", addr)
} else if cluster.direct {
} else if cluster.dialInfo.Direct {
logf("SYNC %s in unknown state. Pretending it's a slave due to direct connection.", addr)
} else {
logf("SYNC %s is neither a master nor a slave.", addr)
Expand Down Expand Up @@ -386,7 +372,7 @@ func (cluster *mongoCluster) syncServersLoop() {
break
}
cluster.references++ // Keep alive while syncing.
direct := cluster.direct
direct := cluster.dialInfo.Direct
cluster.Unlock()

cluster.syncServersIteration(direct)
Expand All @@ -401,7 +387,7 @@ func (cluster *mongoCluster) syncServersLoop() {

// Hold off before allowing another sync. No point in
// burning CPU looking for down servers.
if !cluster.failFast {
if !cluster.dialInfo.FailFast {
time.Sleep(syncShortDelay)
}

Expand Down Expand Up @@ -439,13 +425,11 @@ func (cluster *mongoCluster) syncServersLoop() {
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
cluster.RLock()
server := cluster.servers.Search(tcpaddr.String())
minPoolSize := cluster.minPoolSize
maxIdleTimeMS := cluster.maxIdleTimeMS
cluster.RUnlock()
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS)
return newServer(addr, tcpaddr, cluster.sync, cluster.dial, cluster.dialInfo)
}

func resolveAddr(addr string) (*net.TCPAddr, error) {
Expand Down Expand Up @@ -614,19 +598,10 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
cluster.Unlock()
}

// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0)
}

// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration,
) (s *mongoSocket, err error) {
func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(mode Mode, slaveOk bool, syncTimeout time.Duration, serverTags []bson.D, info *DialInfo) (s *mongoSocket, err error) {
var started time.Time
var syncCount uint
for {
Expand All @@ -645,7 +620,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
// Initialize after fast path above.
started = time.Now()
syncCount = cluster.syncCount
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.failFast && cluster.syncCount != syncCount {
} else if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) || cluster.dialInfo.FailFast && cluster.syncCount != syncCount {
cluster.RUnlock()
return nil, errors.New("no reachable servers")
}
Expand All @@ -670,7 +645,7 @@ func (cluster *mongoCluster) AcquireSocketWithPoolTimeout(
continue
}

s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout)
s, abended, err := server.AcquireSocketWithBlocking(info)
if err == errPoolTimeout {
// No need to remove servers from the topology if acquiring a socket fails for this reason.
return nil, err
Expand Down
2 changes: 0 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,8 +1055,6 @@ func (s *S) TestSocketTimeoutOnDial(c *C) {

timeout := 1 * time.Second

defer mgo.HackSyncSocketTimeout(timeout)()

s.Freeze("localhost:40001")

started := time.Now()
Expand Down
14 changes: 0 additions & 14 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,6 @@ func HackPingDelay(newDelay time.Duration) (restore func()) {
return
}

func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) {
globalMutex.Lock()
defer globalMutex.Unlock()

oldTimeout := syncSocketTimeout
restore = func() {
globalMutex.Lock()
syncSocketTimeout = oldTimeout
globalMutex.Unlock()
}
syncSocketTimeout = newTimeout
return
}

func (s *Session) Cluster() *mongoCluster {
return s.cluster()
}
Expand Down
Loading

0 comments on commit 72d0ac2

Please sign in to comment.