From d61f87fb837501c0288facbc2e171ba6d03aad4d Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Fri, 13 Sep 2019 10:23:08 -0700 Subject: [PATCH 1/6] Issue #5038 : Allow sending back stale shard name data if requested, instead of zeroing it out. Signed-off-by: Jacques Grove --- go/vt/srvtopo/discover.go | 2 +- go/vt/srvtopo/keyspace_filtering_server.go | 3 ++- go/vt/srvtopo/resilient_server.go | 6 ++++-- go/vt/srvtopo/resolver.go | 2 +- go/vt/srvtopo/server.go | 2 +- go/vt/vtexplain/vtexplain_topo.go | 2 +- 6 files changed, 10 insertions(+), 7 deletions(-) diff --git a/go/vt/srvtopo/discover.go b/go/vt/srvtopo/discover.go index e2a3a8530d1..76d1b0fce84 100644 --- a/go/vt/srvtopo/discover.go +++ b/go/vt/srvtopo/discover.go @@ -33,7 +33,7 @@ import ( // for the provided tablet types. It returns one Target object per // keyspace / shard / matching TabletType. func FindAllTargets(ctx context.Context, ts Server, cell string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) { - ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell) + ksNames, err := ts.GetSrvKeyspaceNames(ctx, cell, true) if err != nil { return nil, err } diff --git a/go/vt/srvtopo/keyspace_filtering_server.go b/go/vt/srvtopo/keyspace_filtering_server.go index 6e272cf1939..4ba4b17e3d8 100644 --- a/go/vt/srvtopo/keyspace_filtering_server.go +++ b/go/vt/srvtopo/keyspace_filtering_server.go @@ -72,8 +72,9 @@ func (ksf keyspaceFilteringServer) GetTopoServer() (*topo.Server, error) { func (ksf keyspaceFilteringServer) GetSrvKeyspaceNames( ctx context.Context, cell string, + staleOK bool, ) ([]string, error) { - keyspaces, err := ksf.server.GetSrvKeyspaceNames(ctx, cell) + keyspaces, err := ksf.server.GetSrvKeyspaceNames(ctx, cell, staleOK) ret := make([]string, 0, len(keyspaces)) for _, ks := range keyspaces { if ksf.selectKeyspaces[ks] { diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index 6340bfe6551..ee4150f6d0a 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -228,7 +228,7 @@ func (server *ResilientServer) GetTopoServer() (*topo.Server, error) { } // GetSrvKeyspaceNames returns all keyspace names for the given cell. -func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { +func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) { server.counts.Add(queryCategory, 1) // find the entry in the cache, add it if not there @@ -257,7 +257,7 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str // If it is not time to check again, then return either the cached // value or the cached error but don't ask consul again. if !shouldRefresh { - if cacheValid { + if cacheValid || staleOK { return entry.value, nil } return nil, entry.lastError @@ -283,6 +283,8 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str if err == nil { // save the value we got and the current time in the cache entry.insertionTime = time.Now() + // Avoid a tiny race if TTL == refresh time (the default) + entry.lastQueryTime = entry.insertionTime entry.value = result } else { server.counts.Add(errorCategory, 1) diff --git a/go/vt/srvtopo/resolver.go b/go/vt/srvtopo/resolver.go index e67cebe5d7c..429a8a89d12 100644 --- a/go/vt/srvtopo/resolver.go +++ b/go/vt/srvtopo/resolver.go @@ -149,7 +149,7 @@ func (r *Resolver) GetAllShards(ctx context.Context, keyspace string, tabletType // GetAllKeyspaces returns all the known keyspaces in the local cell. func (r *Resolver) GetAllKeyspaces(ctx context.Context) ([]string, error) { - keyspaces, err := r.topoServ.GetSrvKeyspaceNames(ctx, r.localCell) + keyspaces, err := r.topoServ.GetSrvKeyspaceNames(ctx, r.localCell, true) if err != nil { return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "keyspace names fetch error: %v", err) } diff --git a/go/vt/srvtopo/server.go b/go/vt/srvtopo/server.go index b783c78d3f8..c5a94841d6d 100644 --- a/go/vt/srvtopo/server.go +++ b/go/vt/srvtopo/server.go @@ -37,7 +37,7 @@ type Server interface { // GetSrvKeyspaceNames returns the list of keyspaces served in // the provided cell. - GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) + GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) // GetSrvKeyspace returns the SrvKeyspace for a cell/keyspace. GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) diff --git a/go/vt/vtexplain/vtexplain_topo.go b/go/vt/vtexplain/vtexplain_topo.go index e4e0f374969..6c179e8fe4c 100644 --- a/go/vt/vtexplain/vtexplain_topo.go +++ b/go/vt/vtexplain/vtexplain_topo.go @@ -60,7 +60,7 @@ func (et *ExplainTopo) GetTopoServer() (*topo.Server, error) { } // GetSrvKeyspaceNames is part of the srvtopo.Server interface. -func (et *ExplainTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { +func (et *ExplainTopo) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) { et.Lock.Lock() defer et.Lock.Unlock() From bad7398836e6a7f78ad7ccc75bd707a6922c1ff5 Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Wed, 26 Feb 2020 14:24:28 -0800 Subject: [PATCH 2/6] Fix new usages of GetSrvKeyspaceNames. Fix some tests. Signed-off-by: Jacques Grove --- go/vt/srvtopo/keyspace_filtering_server_test.go | 2 +- go/vt/srvtopo/resilient_server_test.go | 10 +++++----- go/vt/srvtopo/srvtopotest/passthrough.go | 2 +- go/vt/vtgate/sandbox_test.go | 2 +- go/vt/vtgate/vstream_manager.go | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go/vt/srvtopo/keyspace_filtering_server_test.go b/go/vt/srvtopo/keyspace_filtering_server_test.go index f2c3e282362..b39a1b7cac1 100644 --- a/go/vt/srvtopo/keyspace_filtering_server_test.go +++ b/go/vt/srvtopo/keyspace_filtering_server_test.go @@ -89,7 +89,7 @@ func doTestGetSrvKeyspaceNames( want []string, wantErr error, ) { - got, gotErr := f.GetSrvKeyspaceNames(stockCtx, cell) + got, gotErr := f.GetSrvKeyspaceNames(stockCtx, cell, false) if got == nil { t.Errorf("GetSrvKeyspaceNames failed: should not return nil") diff --git a/go/vt/srvtopo/resilient_server_test.go b/go/vt/srvtopo/resilient_server_test.go index f3571e1adba..73a7da936a4 100644 --- a/go/vt/srvtopo/resilient_server_test.go +++ b/go/vt/srvtopo/resilient_server_test.go @@ -498,7 +498,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { ts.UpdateSrvKeyspace(context.Background(), "test_cell", "test_ks2", want) ctx := context.Background() - names, err := rs.GetSrvKeyspaceNames(ctx, "test_cell") + names, err := rs.GetSrvKeyspaceNames(ctx, "test_cell", false) if err != nil { t.Errorf("GetSrvKeyspaceNames unexpected error %v", err) } @@ -523,7 +523,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { // elapses but before the TTL expires start := time.Now() for { - names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell") + names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", false) if err != nil { t.Errorf("GetSrvKeyspaceNames unexpected error %v", err) } @@ -541,7 +541,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { // Now wait for it to expire from cache for { - _, err = rs.GetSrvKeyspaceNames(ctx, "test_cell") + _, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", false) if err != nil { break } @@ -569,7 +569,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { start = time.Now() for { - names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell") + names, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", false) if err == nil { break } @@ -599,7 +599,7 @@ func TestGetSrvKeyspaceNames(t *testing.T) { time.Sleep(*srvTopoCacheTTL) timeoutCtx, _ := context.WithTimeout(context.Background(), *srvTopoCacheRefresh*2) - _, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell") + _, err = rs.GetSrvKeyspaceNames(timeoutCtx, "test_cell", false) wantErr := "timed out waiting for keyspace names" if err == nil || err.Error() != wantErr { t.Errorf("expected error '%v', got '%v'", wantErr, err.Error()) diff --git a/go/vt/srvtopo/srvtopotest/passthrough.go b/go/vt/srvtopo/srvtopotest/passthrough.go index 3aa4dc86ff5..591960f9f9d 100644 --- a/go/vt/srvtopo/srvtopotest/passthrough.go +++ b/go/vt/srvtopo/srvtopotest/passthrough.go @@ -50,7 +50,7 @@ func (srv *PassthroughSrvTopoServer) GetTopoServer() (*topo.Server, error) { } // GetSrvKeyspaceNames implements srvtopo.Server -func (srv *PassthroughSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { +func (srv *PassthroughSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) { return srv.SrvKeyspaceNames, srv.SrvKeyspaceNamesError } diff --git a/go/vt/vtgate/sandbox_test.go b/go/vt/vtgate/sandbox_test.go index c97f1255daf..13b82b6b8b5 100644 --- a/go/vt/vtgate/sandbox_test.go +++ b/go/vt/vtgate/sandbox_test.go @@ -240,7 +240,7 @@ func (sct *sandboxTopo) GetTopoServer() (*topo.Server, error) { } // GetSrvKeyspaceNames is part of the srvtopo.Server interface. -func (sct *sandboxTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { +func (sct *sandboxTopo) GetSrvKeyspaceNames(ctx context.Context, cell string, staleOK bool) ([]string, error) { sandboxMu.Lock() defer sandboxMu.Unlock() keyspaces := make([]string, 0, 1) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 4ef60074a4d..c85437ce2de 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -115,7 +115,7 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat if vgtid.ShardGtids[0].Gtid != "current" { return nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "for an empty keyspace, the Gtid value must be 'current': %v", vgtid) } - keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell) + keyspaces, err := vsm.toposerv.GetSrvKeyspaceNames(ctx, vsm.cell, false) if err != nil { return nil, nil, err } From 3775d77e1d4fe32ec063afad8cb63c44933136b9 Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Wed, 26 Feb 2020 16:05:07 -0800 Subject: [PATCH 3/6] Simplify stale topo serving codepath. We will now also serve stale topo while the topo cache is refreshing. Signed-off-by: Jacques Grove --- go/vt/srvtopo/resilient_server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index 15a05175b00..d4993e2dbad 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -259,13 +259,13 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str entry.mutex.Lock() defer entry.mutex.Unlock() - cacheValid := entry.value != nil && time.Since(entry.insertionTime) < server.cacheTTL + cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < server.cacheTTL || staleOK) shouldRefresh := time.Since(entry.lastQueryTime) > server.cacheRefresh // If it is not time to check again, then return either the cached - // value or the cached error but don't ask consul again. + // value or the cached error but don't ask topo again. if !shouldRefresh { - if cacheValid || staleOK { + if cacheValid { return entry.value, nil } return nil, entry.lastError From 9267c75e02630e3d2334bcee5f68463dda001610 Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Wed, 26 Feb 2020 16:22:19 -0800 Subject: [PATCH 4/6] Only allow stale topo data if we haven't disabled caching. Signed-off-by: Jacques Grove --- go/vt/srvtopo/resilient_server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index d4993e2dbad..bc5ae4405c0 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -259,7 +259,11 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str entry.mutex.Lock() defer entry.mutex.Unlock() - cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < server.cacheTTL || staleOK) + // Cache iff: + // * entry is still under cache TTL + // * we are accepting stale values AND we haven't disabled caching + // by setting the cacheTTL to zero + cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < server.cacheTTL || (staleOK && server.cacheTTL > 0)) shouldRefresh := time.Since(entry.lastQueryTime) > server.cacheRefresh // If it is not time to check again, then return either the cached From 356a160183c0466804dfb6bb100929a502a1923a Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Thu, 27 Feb 2020 00:53:35 -0800 Subject: [PATCH 5/6] * Limit period we allow stale topo info for to 2x server refresh period after TTL expires * Add tests Signed-off-by: Jacques Grove --- go/vt/srvtopo/resilient_server.go | 10 +++++----- go/vt/srvtopo/resilient_server_test.go | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/go/vt/srvtopo/resilient_server.go b/go/vt/srvtopo/resilient_server.go index bc5ae4405c0..b96b35b97eb 100644 --- a/go/vt/srvtopo/resilient_server.go +++ b/go/vt/srvtopo/resilient_server.go @@ -259,11 +259,11 @@ func (server *ResilientServer) GetSrvKeyspaceNames(ctx context.Context, cell str entry.mutex.Lock() defer entry.mutex.Unlock() - // Cache iff: - // * entry is still under cache TTL - // * we are accepting stale values AND we haven't disabled caching - // by setting the cacheTTL to zero - cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < server.cacheTTL || (staleOK && server.cacheTTL > 0)) + cacheValid := entry.value != nil && (time.Since(entry.insertionTime) < server.cacheTTL) + if !cacheValid && staleOK { + // Only allow stale results for a bounded period + cacheValid = entry.value != nil && (time.Since(entry.insertionTime) < (server.cacheTTL + 2*server.cacheRefresh)) + } shouldRefresh := time.Since(entry.lastQueryTime) > server.cacheRefresh // If it is not time to check again, then return either the cached diff --git a/go/vt/srvtopo/resilient_server_test.go b/go/vt/srvtopo/resilient_server_test.go index 73a7da936a4..0b4e6d49869 100644 --- a/go/vt/srvtopo/resilient_server_test.go +++ b/go/vt/srvtopo/resilient_server_test.go @@ -557,6 +557,20 @@ func TestGetSrvKeyspaceNames(t *testing.T) { t.Errorf("got error %v want %v", err, forceErr) } + // Now, since the TTL has expired, check that when we ask for stale + // info, we'll get it. + _, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", true) + if err != nil { + t.Fatalf("expected no error if asking for stale cache data") + } + + // Now, wait long enough that with a stale ask, we'll get an error + time.Sleep(*srvTopoCacheRefresh*2 + 2*time.Millisecond) + _, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", true) + if err == nil || err != forceErr { + t.Fatalf("expected error if asking for really stale cache data") + } + // Check that we only checked the topo service 1 or 2 times during the // period where we got the cached error. cachedReqs, ok := rs.counts.Counts()[cachedCategory] From 471dc2f23a94dd7cbfaceefdab5dfaab99680a81 Mon Sep 17 00:00:00 2001 From: Jacques Grove Date: Thu, 27 Feb 2020 01:19:38 -0800 Subject: [PATCH 6/6] Simplify test condition Signed-off-by: Jacques Grove --- go/vt/srvtopo/resilient_server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/srvtopo/resilient_server_test.go b/go/vt/srvtopo/resilient_server_test.go index 0b4e6d49869..f70563661cd 100644 --- a/go/vt/srvtopo/resilient_server_test.go +++ b/go/vt/srvtopo/resilient_server_test.go @@ -567,8 +567,8 @@ func TestGetSrvKeyspaceNames(t *testing.T) { // Now, wait long enough that with a stale ask, we'll get an error time.Sleep(*srvTopoCacheRefresh*2 + 2*time.Millisecond) _, err = rs.GetSrvKeyspaceNames(ctx, "test_cell", true) - if err == nil || err != forceErr { - t.Fatalf("expected error if asking for really stale cache data") + if err != forceErr { + t.Fatalf("expected an error if asking for really stale cache data") } // Check that we only checked the topo service 1 or 2 times during the