-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Fix race in replicationLagModule of go/vt/throttle
#16078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f53e089
1b49ce7
49e2dc7
e556e2f
df59fcf
1da308b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ package throttler | |
|
|
||
| import ( | ||
| "sort" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "vitess.io/vitess/go/vt/discovery" | ||
|
|
@@ -30,6 +31,8 @@ type replicationLagCache struct { | |
| // The map key is replicationLagRecord.LegacyTabletStats.Key. | ||
| entries map[string]*replicationLagHistory | ||
|
|
||
| mu sync.Mutex | ||
|
|
||
| // slowReplicas is a set of slow replicas. | ||
| // The map key is replicationLagRecord.LegacyTabletStats.Key. | ||
| // This map will always be recomputed by sortByLag() and must not be modified | ||
|
|
@@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache | |
|
|
||
| // add inserts or updates "r" in the cache for the replica with the key "r.Key". | ||
| func (c *replicationLagCache) add(r replicationLagRecord) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| if !r.Serving { | ||
| // Tablet is down. Do no longer track it. | ||
| delete(c.entries, discovery.TabletToMapKey(r.Tablet)) | ||
|
|
@@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) { | |
| entry.add(r) | ||
| } | ||
|
|
||
| // maxLag returns the maximum replication lag for the entries in cache. | ||
| func (c *replicationLagCache) maxLag() (maxLag uint32) { | ||
|
||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| for key := range c.entries { | ||
| if c.isIgnored(key) { | ||
| continue | ||
| } | ||
|
|
||
| entry := c.entries[key] | ||
| if entry == nil { | ||
| continue | ||
| } | ||
|
|
||
| latest := entry.latest() | ||
| if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { | ||
| maxLag = lag | ||
| } | ||
| } | ||
|
|
||
| return maxLag | ||
| } | ||
|
|
||
| // latest returns the current lag record for the given LegacyTabletStats.Key string. | ||
| // A zero record is returned if there is no latest entry. | ||
| func (c *replicationLagCache) latest(key string) replicationLagRecord { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| entry, ok := c.entries[key] | ||
| if !ok { | ||
| return replicationLagRecord{} | ||
|
|
@@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { | |
| // or just after it. | ||
| // If there is no such record, a zero record is returned. | ||
| func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
| entry, ok := c.entries[key] | ||
| if !ok { | ||
| return replicationLagRecord{} | ||
|
|
@@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag | |
| // sortByLag sorts all replicas by their latest replication lag value and | ||
| // tablet uid and updates the c.slowReplicas set. | ||
| func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| // Reset the current list of ignored replicas. | ||
| c.slowReplicas = make(map[string]bool) | ||
|
|
||
|
|
@@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { | |
| // this slow replica. | ||
| // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. | ||
| func (c *replicationLagCache) ignoreSlowReplica(key string) bool { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| if len(c.slowReplicas) == 0 { | ||
| // No slow replicas at all. | ||
| return false | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -245,22 +245,10 @@ func (t *ThrottlerImpl) Throttle(threadID int) time.Duration { | |
| // the provided type, excluding ignored tablets. | ||
| func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32 { | ||
| cache := t.maxReplicationLagModule.lagCacheByType(tabletType) | ||
|
|
||
| var maxLag uint32 | ||
| cacheEntries := cache.entries | ||
|
|
||
| for key := range cacheEntries { | ||
| if cache.isIgnored(key) { | ||
| continue | ||
| } | ||
|
|
||
| lag := cache.latest(key).Stats.ReplicationLagSeconds | ||
| if lag > maxLag { | ||
| maxLag = lag | ||
| } | ||
| if cache == nil { | ||
| return 0 | ||
| } | ||
|
|
||
| return maxLag | ||
| return cache.maxLag() | ||
|
||
| } | ||
|
|
||
| // ThreadFinished marks threadID as finished and redistributes the thread's | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,13 +17,25 @@ limitations under the License. | |
| package throttler | ||
|
|
||
| import ( | ||
| "context" | ||
| "runtime" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "vitess.io/vitess/go/vt/discovery" | ||
| "vitess.io/vitess/go/vt/proto/query" | ||
| "vitess.io/vitess/go/vt/proto/topodata" | ||
| ) | ||
|
|
||
| // testTabletTypes is the list of tablet types to test. | ||
| var testTabletTypes = []topodata.TabletType{ | ||
| topodata.TabletType_REPLICA, | ||
| topodata.TabletType_RDONLY, | ||
| } | ||
|
|
||
| // The main purpose of the benchmarks below is to demonstrate the functionality | ||
| // of the throttler in the real-world (using a non-faked time.Now). | ||
| // The benchmark values should be as close as possible to the request interval | ||
|
|
@@ -402,3 +414,73 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { | |
| }() | ||
| throttler.ThreadFinished(0) | ||
| } | ||
|
|
||
| func TestThrottlerMaxLag(t *testing.T) { | ||
| fc := &fakeClock{} | ||
| th, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) | ||
| require.NoError(t, err) | ||
| throttler := th.(*ThrottlerImpl) | ||
| defer throttler.Close() | ||
|
|
||
| require.NotNil(t, throttler) | ||
| require.NotNil(t, throttler.maxReplicationLagModule) | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
timvaillancourt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| defer cancel() | ||
|
|
||
| var wg sync.WaitGroup | ||
|
|
||
| // run .add() and .MaxLag() concurrently to detect races | ||
| for _, tabletType := range testTabletTypes { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| default: | ||
| throttler.MaxLag(tabletType) | ||
| } | ||
| } | ||
|
Comment on lines
+438
to
+445
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a busy loop. potentially running thousands of time in the span of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shlomi-noach yes, these busy loops were used to reproduce the race in unit tests, before I fixed the race |
||
| }() | ||
|
|
||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| default: | ||
|
Comment on lines
+451
to
+455
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a busy loop. Is this intentional? For the duration of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shlomi-noach yes, the intent here was to call |
||
| cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) | ||
| require.NotNil(t, cache) | ||
| cache.add(replicationLagRecord{ | ||
| time: time.Now(), | ||
| TabletHealth: discovery.TabletHealth{ | ||
| Serving: true, | ||
| Stats: &query.RealtimeStats{ | ||
| ReplicationLagSeconds: 5, | ||
| }, | ||
| Tablet: &topodata.Tablet{ | ||
| Hostname: t.Name(), | ||
| Type: tabletType, | ||
| PortMap: map[string]int32{ | ||
| "test": 15999, | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| } | ||
| } | ||
| }() | ||
| } | ||
| time.Sleep(time.Second) | ||
| cancel() | ||
| wg.Wait() | ||
|
|
||
| // check .MaxLag() | ||
| for _, tabletType := range testTabletTypes { | ||
| require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.