Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go/vt/discovery/tablet_stats_cache_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (a TargetArray) Less(i, j int) bool {
func TestFindAllKeyspaceShards(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1", "cell2")
flag.Set("srv_topo_cache_ttl", "0s") // No caching values
flag.Set("srv_topo_cache_ttl", "0s") // No caching values
flag.Set("srv_topo_cache_refresh", "0s") // No caching values
rs := srvtopo.NewResilientServer(ts, "TestFindAllKeyspaceShards")

// No keyspace / shards.
Expand Down
139 changes: 110 additions & 29 deletions go/vt/srvtopo/resilient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,20 @@ import (
)

var (
srvTopoCacheTTL = flag.Duration("srv_topo_cache_ttl", 1*time.Second, "how long to use cached entries for topology")
// srvTopoCacheTTL and srvTopoCacheRefresh control the behavior of
// the caching for both watched and unwatched values.
//
// For entries we don't watch (like the list of Keyspaces), we refresh
// the cached list from the topo after srv_topo_cache_refresh elapses.
// If the fetch fails, we hold onto the cached value until
// srv_topo_cache_ttl elapses.
//
// For entries we watch (like the SrvKeyspace for a given cell), if
// setting the watch fails, we will use the last known value until
// srv_topo_cache_ttl elapses and we only try to re-establish the watch
// once every srv_topo_cache_refresh interval.
srvTopoCacheTTL = flag.Duration("srv_topo_cache_ttl", 1*time.Second, "how long to use cached entries for topology")
srvTopoCacheRefresh = flag.Duration("srv_topo_cache_refresh", 1*time.Second, "how frequently to refresh the topology for cached entries")
)

const (
Expand Down Expand Up @@ -96,9 +109,10 @@ const (
// - limit the QPS to the underlying topo.Server
// - return the last known value of the data if there is an error
type ResilientServer struct {
topoServer *topo.Server
cacheTTL time.Duration
counts *stats.Counters
topoServer *topo.Server
cacheTTL time.Duration
cacheRefresh time.Duration
counts *stats.Counters

// mutex protects the cache map itself, not the individual
// values in the cache.
Expand All @@ -115,6 +129,7 @@ type srvKeyspaceNamesEntry struct {
mutex sync.Mutex

insertionTime time.Time
lastQueryTime time.Time
value []string
lastError error
lastErrorCtx context.Context
Expand All @@ -134,28 +149,42 @@ type srvKeyspaceEntry struct {
//
// if watchrunning is not set, the next time we try to access the
// keyspace, we will start a watch.
// if watchrunning is set, we are guaranteed to have exactly one of
// value or lastError be nil, and the other non-nil.
// if watchrunning is set, we are guaranteed to have lastError be
// non-nil and an up-to-date value (which may be nil)
watchRunning bool
value *topodatapb.SrvKeyspace
lastError error

// valueTime is the time when the watch last obtained a non-nil value.
// It is compared to the TTL to determine if we can return the value
// when the watch is failing
lastValueTime time.Time

// lastErrorCtx tries to remember the context of the query
// that failed to get the SrvKeyspace, so we can display it in
// the status UI. The background routine that refreshes the
// keyspace will not populate this field.
// The intent is to have the source of a query that for instance
// has a bad keyspace or cell name.
lastErrorCtx context.Context

// lastErrorTime records the time that the watch failed, so that
// any requests that come in
lastErrorTime time.Time
}

// NewResilientServer creates a new ResilientServer
// based on the provided topo.Server.
func NewResilientServer(base *topo.Server, counterPrefix string) *ResilientServer {
if *srvTopoCacheRefresh > *srvTopoCacheTTL {
log.Fatalf("srv_topo_cache_refresh must be less than or equal to srv_topo_cache_ttl")
}

return &ResilientServer{
topoServer: base,
cacheTTL: *srvTopoCacheTTL,
counts: stats.NewCounters(counterPrefix + "Counts"),
topoServer: base,
cacheTTL: *srvTopoCacheTTL,
cacheRefresh: *srvTopoCacheRefresh,
counts: stats.NewCounters(counterPrefix + "Counts"),

srvKeyspaceNamesCache: make(map[string]*srvKeyspaceNamesEntry),
srvKeyspaceCache: make(map[string]*srvKeyspaceEntry),
Expand Down Expand Up @@ -184,29 +213,45 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str
entry.mutex.Lock()
defer entry.mutex.Unlock()

// If the entry is fresh enough, return it
if time.Now().Sub(entry.insertionTime) < server.cacheTTL {
return entry.value, entry.lastError
// If it is not time to check again, then return either the cached
// value or the cached error
cacheValid := entry.value != nil && time.Since(entry.insertionTime) < server.cacheTTL
shouldRefresh := time.Since(entry.lastQueryTime) > server.cacheRefresh

if !shouldRefresh {
if cacheValid {
return entry.value, nil
}
return nil, entry.lastError
}

// Not in cache or too old, get the real value. We use the context that issued
// the query here.
// Not in cache or needs refresh so try to get the real value.
// We use the context that issued the query here.
result, err := server.topoServer.GetSrvKeyspaceNames(ctx, cell)
if err != nil {
if err == nil {
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
entry.value = result
} else {
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetSrvKeyspaceNames(%v, %v) failed: %v (no cached value, caching and returning error)", ctx, cell, err)
} else {

} else if cacheValid {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvKeyspaceNames(%v, %v) failed: %v (returning cached value: %v %v)", ctx, cell, err, entry.value, entry.lastError)
return entry.value, entry.lastError
result = entry.value
err = nil
} else {
server.counts.Add(errorCategory, 1)
log.Errorf("GetSrvKeyspaceNames(%v, %v) failed: %v (cached value expired)", ctx, cell, err)
entry.insertionTime = time.Time{}
entry.value = nil
}
}

// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
entry.value = result
entry.lastError = err
entry.lastQueryTime = time.Now()
entry.lastErrorCtx = ctx
return result, err
}
Expand Down Expand Up @@ -259,37 +304,72 @@ func (server *ResilientServer) GetSrvKeyspace(ctx context.Context, cell, keyspac
return entry.value, entry.lastError
}

// Watch is not running, let's try to start it.
// Watch is not running, but check if the last time we got an error was
// more recent than the refresh interval.
//
// If so return either the last cached value or the last error we got.
cacheValid := entry.value != nil && time.Since(entry.lastValueTime) < server.cacheTTL
shouldRefresh := time.Since(entry.lastErrorTime) > server.cacheRefresh

if !shouldRefresh {
if cacheValid {
server.counts.Add(cachedCategory, 1)
return entry.value, nil
}
return nil, entry.lastError
}

// Time to try to start the watch again.
// We use a background context, as starting the watch should keep going
// even if the current query context is short-lived.
newCtx := context.Background()
current, changes, _ := server.topoServer.WatchSrvKeyspace(newCtx, cell, keyspace)
current, changes, cancel := server.topoServer.WatchSrvKeyspace(newCtx, cell, keyspace)
if current.Err != nil {
// lastError and lastErrorCtx will be visible from the UI
// until the next try
entry.value = nil
entry.lastError = current.Err
entry.lastErrorCtx = ctx
log.Errorf("WatchSrvKeyspace failed for %v/%v: %v", cell, keyspace, current.Err)
entry.lastErrorTime = time.Now()

// if the node disappears, delete the cached value
if current.Err == topo.ErrNoNode {
entry.value = nil
}

server.counts.Add(errorCategory, 1)
log.Errorf("Initial WatchSrvKeyspace failed for %v/%v: %v", cell, keyspace, current.Err)

if cacheValid {
return entry.value, nil
}

return nil, current.Err
}

// we are now watching, cache the first notification
entry.watchRunning = true
entry.value = current.Value
entry.lastValueTime = time.Now()
entry.lastError = nil
entry.lastErrorCtx = nil

go func() {
defer cancel()

for c := range changes {
if c.Err != nil {
// Watch errored out. We log it, clear
// our record, and return.
err := fmt.Errorf("watch for SrvKeyspace %v in cell %v failed: %v", keyspace, cell, c.Err)
// Watch errored out.
//
// Log it and store the error, but do not clear the value
// so it can be used until the ttl elapses unless the node
// was deleted.
err := fmt.Errorf("WatchSrvKeyspace failed for %v/%v: %v", cell, keyspace, c.Err)
log.Errorf("%v", err)
server.counts.Add(errorCategory, 1)
entry.mutex.Lock()
if c.Err == topo.ErrNoNode {
entry.value = nil
}
entry.watchRunning = false
entry.value = nil
entry.lastError = err
entry.lastErrorCtx = nil
entry.mutex.Unlock()
Expand All @@ -299,6 +379,7 @@ func (server *ResilientServer) GetSrvKeyspace(ctx context.Context, cell, keyspac
// We got a new value, save it.
entry.mutex.Lock()
entry.value = c.Value
entry.lastValueTime = time.Now()
entry.lastError = nil
entry.lastErrorCtx = nil
entry.mutex.Unlock()
Expand Down
Loading