Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// for the provided tablet types. It returns one Target object per
// keyspace / shard / matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell)
ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion go/vt/srvtopo/keyspace_filtering_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func (ksf keyspaceFilteringServer) GetTopoServer() (*topo.Server, error) {
func (ksf keyspaceFilteringServer) GetSrvKeyspaceNames(
ctx context.Context,
cell string,
staleOK bool,
) ([]string, error) {
keyspaces, err := ksf.server.GetSrvKeyspaceNames(ctx, cell)
keyspaces, err := ksf.server.GetSrvKeyspaceNames(ctx, cell, staleOK)
ret := make([]string, 0, len(keyspaces))
for _, ks := range keyspaces {
if ksf.selectKeyspaces[ks] {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/srvtopo/keyspace_filtering_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func doTestGetSrvKeyspaceNames(
want []string,
wantErr error,
) {
got, gotErr := f.GetSrvKeyspaceNames(stockCtx, cell)
got, gotErr := f.GetSrvKeyspaceNames(stockCtx, cell, false)

if got == nil {
t.Errorf("GetSrvKeyspaceNames failed: should not return nil")
Expand Down
12 changes: 9 additions & 3 deletions go/vt/srvtopo/resilient_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (server *ResilientServer) GetTopoServer() (*topo.Server, error) {
}

// GetSrvKeyspaceNames returns all keyspace names for the given cell.
func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) {
server.counts.Add(queryCategory, 1)

// find the entry in the cache, add it if not there
Expand All @@ -259,11 +259,15 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str
entry.mutex.Lock()
defer entry.mutex.Unlock()

cacheValid := entry.value != nil && time.Since(entry.insertionTime) < server.cacheTTL
cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < server.cacheTTL)
if !cacheValid && staleOK {
// Only allow stale results for a bounded period
cacheValid = entry.value != nil && (time.Since(entry.insertionTime) < (server.cacheTTL + 2*server.cacheRefresh))
}
shouldRefresh := time.Since(entry.lastQueryTime) > server.cacheRefresh

// If it is not time to check again, then return either the cached
// value or the cached error but don't ask consul again.
// value or the cached error but don't ask topo again.
if !shouldRefresh {
if cacheValid {
return entry.value, nil
Expand Down Expand Up @@ -291,6 +295,8 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str
if err == nil {
// save the value we got and the current time in the cache
entry.insertionTime = time.Now()
// Avoid a tiny race if TTL == refresh time (the default)
entry.lastQueryTime = entry.insertionTime
entry.value = result
} else {
server.counts.Add(errorCategory, 1)
Expand Down
24 changes: 19 additions & 5 deletions go/vt/srvtopo/resilient_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
ts.UpdateSrvKeyspace(context.Background(), "test_cell", "test_ks2", want)

ctx := context.Background()
names, err := rs.GetSrvKeyspaceNames(ctx, "test_cell")
names, err := rs.GetSrvKeyspaceNames(ctx, "test_cell", false)
if err != nil {
t.Errorf("GetSrvKeyspaceNames unexpected error %v", err)
}
Expand All @@ -523,7 +523,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
// elapses but before the TTL expires
start := time.Now()
for {
names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell")
names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", false)
if err != nil {
t.Errorf("GetSrvKeyspaceNames unexpected error %v", err)
}
Expand All @@ -541,7 +541,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {

// Now wait for it to expire from cache
for {
_, err = rs.GetSrvKeyspaceNames(ctx, "test_cell")
_, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", false)
if err != nil {
break
}
Expand All @@ -557,6 +557,20 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
t.Errorf("got error %v want %v", err, forceErr)
}

// Now, since the TTL has expired, check that when we ask for stale
// info, we'll get it.
_, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", true)
if err != nil {
t.Fatalf("expected no error if asking for stale cache data")
}

// Now, wait long enough that with a stale ask, we'll get an error
time.Sleep(*srvTopoCacheRefresh*2 + 2*time.Millisecond)
_, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", true)
if err != forceErr {
t.Fatalf("expected an error if asking for really stale cache data")
}

// Check that we only checked the topo service 1 or 2 times during the
// period where we got the cached error.
cachedReqs, ok := rs.counts.Counts()[cachedCategory]
Expand All @@ -569,7 +583,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {

start = time.Now()
for {
names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell")
names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", false)
if err == nil {
break
}
Expand Down Expand Up @@ -599,7 +613,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) {
time.Sleep(*srvTopoCacheTTL)

timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2)
_, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell")
_, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell", false)
wantErr := "timed out waiting for keyspace names"
if err == nil || err.Error() != wantErr {
t.Errorf("expected error '%v', got '%v'", wantErr, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion go/vt/srvtopo/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (r *Resolver) GetAllShards(ctx context.Context, keyspace string, tabletType

// GetAllKeyspaces returns all the known keyspaces in the local cell.
func (r *Resolver) GetAllKeyspaces(ctx context.Context) ([]string, error) {
keyspaces, err := r.topoServ.GetSrvKeyspaceNames(ctx, r.localCell)
keyspaces, err := r.topoServ.GetSrvKeyspaceNames(ctx, r.localCell, true)
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "keyspace names fetch error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/srvtopo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Server interface {

// GetSrvKeyspaceNames returns the list of keyspaces served in
// the provided cell.
GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error)
GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error)

// GetSrvKeyspace returns the SrvKeyspace for a cell/keyspace.
GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/srvtopo/srvtopotest/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (srv *PassthroughSrvTopoServer) GetTopoServer() (*topo.Server, error) {
}

// GetSrvKeyspaceNames implements srvtopo.Server
func (srv *PassthroughSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
func (srv *PassthroughSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) {
return srv.SrvKeyspaceNames, srv.SrvKeyspaceNamesError
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_topo.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (et *ExplainTopo) GetTopoServer() (*topo.Server, error) {
}

// GetSrvKeyspaceNames is part of the srvtopo.Server interface.
func (et *ExplainTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
func (et *ExplainTopo) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) {
et.Lock.Lock()
defer et.Lock.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/sandbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (sct *sandboxTopo) GetTopoServer() (*topo.Server, error) {
}

// GetSrvKeyspaceNames is part of the srvtopo.Server interface.
func (sct *sandboxTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) {
func (sct *sandboxTopo) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) {
sandboxMu.Lock()
defer sandboxMu.Unlock()
keyspaces := make([]string, 0, 1)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
if vgtid.ShardGtids[0].Gtid != "current" {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid)
}
keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell)
keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false)
if err != nil {
return nil, nil, err
}
Expand Down