diff --git a/Makefile b/Makefile index 39c90ed1401..9c301607825 100644 --- a/Makefile +++ b/Makefile @@ -140,7 +140,9 @@ lint: golangci-lint run # Ensure no blacklisted package is imported. - faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,golang.org/x/net/context=context" ./pkg/... ./cmd/... ./tools/... ./integration/... + faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,\ + golang.org/x/net/context=context,\ + sync/atomic=go.uber.org/atomic" ./pkg/... ./cmd/... ./tools/... ./integration/... # Validate Kubernetes spec files. Requires: # https://kubeval.instrumenta.dev diff --git a/pkg/chunk/cache/memcached_test.go b/pkg/chunk/cache/memcached_test.go index 5bf8b6e2aab..ee5596f2afd 100644 --- a/pkg/chunk/cache/memcached_test.go +++ b/pkg/chunk/cache/memcached_test.go @@ -3,12 +3,12 @@ package cache_test import ( "context" "errors" - "sync/atomic" "testing" "github.com/bradfitz/gomemcache/memcache" "github.com/go-kit/kit/log" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/chunk/cache" ) @@ -71,7 +71,7 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) { // mockMemcache whose calls fail 1/3rd of the time. type mockMemcacheFailing struct { *mockMemcache - calls uint64 + calls atomic.Uint64 } func newMockMemcacheFailing() *mockMemcacheFailing { @@ -81,7 +81,7 @@ func newMockMemcacheFailing() *mockMemcacheFailing { } func (c *mockMemcacheFailing) GetMulti(keys []string) (map[string]*memcache.Item, error) { - calls := atomic.AddUint64(&c.calls, 1) + calls := c.calls.Inc() if calls%3 == 0 { return nil, errors.New("fail") } diff --git a/pkg/ingester/mapper.go b/pkg/ingester/mapper.go index 87c1f622b7d..18977e7176e 100644 --- a/pkg/ingester/mapper.go +++ b/pkg/ingester/mapper.go @@ -5,10 +5,10 @@ import ( "sort" "strings" "sync" - "sync/atomic" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" ) @@ -24,8 +24,7 @@ type fpMappings map[model.Fingerprint]map[string]model.Fingerprint // fpMapper is used to map fingerprints in order to work around fingerprint // collisions. type fpMapper struct { - // highestMappedFP has to be aligned for atomic operations. - highestMappedFP model.Fingerprint + highestMappedFP atomic.Uint64 mtx sync.RWMutex // Protects mappings. mappings fpMappings @@ -130,7 +129,7 @@ func (m *fpMapper) maybeAddMapping( } func (m *fpMapper) nextMappedFP() model.Fingerprint { - mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1)) + mappedFP := model.Fingerprint(m.highestMappedFP.Inc()) if mappedFP > maxMappedFP { panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP)) } diff --git a/pkg/ingester/rate.go b/pkg/ingester/rate.go index d0c348677b4..ecabd8b783a 100644 --- a/pkg/ingester/rate.go +++ b/pkg/ingester/rate.go @@ -2,13 +2,14 @@ package ingester import ( "sync" - "sync/atomic" "time" + + "go.uber.org/atomic" ) // ewmaRate tracks an exponentially weighted moving average of a per-second rate. type ewmaRate struct { - newEvents int64 + newEvents atomic.Int64 alpha float64 interval time.Duration lastRate float64 @@ -32,8 +33,8 @@ func (r *ewmaRate) rate() float64 { // tick assumes to be called every r.interval. func (r *ewmaRate) tick() { - newEvents := atomic.LoadInt64(&r.newEvents) - atomic.AddInt64(&r.newEvents, -newEvents) + newEvents := r.newEvents.Load() + r.newEvents.Sub(newEvents) instantRate := float64(newEvents) / r.interval.Seconds() r.mutex.Lock() @@ -49,9 +50,9 @@ func (r *ewmaRate) tick() { // inc counts one event. func (r *ewmaRate) inc() { - atomic.AddInt64(&r.newEvents, 1) + r.newEvents.Inc() } func (r *ewmaRate) add(delta int64) { - atomic.AddInt64(&r.newEvents, delta) + r.newEvents.Add(delta) } diff --git a/pkg/ingester/series_map.go b/pkg/ingester/series_map.go index a8e4ba70613..4d4a9a5b669 100644 --- a/pkg/ingester/series_map.go +++ b/pkg/ingester/series_map.go @@ -2,10 +2,10 @@ package ingester import ( "sync" - "sync/atomic" "unsafe" "github.com/prometheus/common/model" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" ) @@ -16,7 +16,7 @@ const seriesMapShards = 128 // goroutine-safe. A seriesMap is effectively a goroutine-safe version of // map[model.Fingerprint]*memorySeries. type seriesMap struct { - size int32 + size atomic.Int32 shards []shard } @@ -65,7 +65,7 @@ func (sm *seriesMap) put(fp model.Fingerprint, s *memorySeries) { shard.mtx.Unlock() if !ok { - atomic.AddInt32(&sm.size, 1) + sm.size.Inc() } } @@ -77,7 +77,7 @@ func (sm *seriesMap) del(fp model.Fingerprint) { delete(shard.m, fp) shard.mtx.Unlock() if ok { - atomic.AddInt32(&sm.size, -1) + sm.size.Dec() } } @@ -106,5 +106,5 @@ func (sm *seriesMap) iter() <-chan fingerprintSeriesPair { } func (sm *seriesMap) length() int { - return int(atomic.LoadInt32(&sm.size)) + return int(sm.size.Load()) } diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 37e0398886d..86bc4126276 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -8,7 +8,6 @@ import ( "net" "net/http" "net/http/httptest" - "sync/atomic" "testing" "time" @@ -25,7 +24,7 @@ import ( httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" - uber_atomic "go.uber.org/atomic" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/querier" @@ -172,10 +171,10 @@ func TestFrontend_RequestHostHeaderWhenDownstreamURLIsConfigured(t *testing.T) { // TestFrontendCancel ensures that when client requests are cancelled, // the underlying query is correctly cancelled _and not retried_. func TestFrontendCancel(t *testing.T) { - var tries int32 + var tries atomic.Int32 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { <-r.Context().Done() - atomic.AddInt32(&tries, 1) + tries.Inc() }) test := func(addr string) { req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil) @@ -195,10 +194,10 @@ func TestFrontendCancel(t *testing.T) { require.Error(t, err) time.Sleep(100 * time.Millisecond) - assert.Equal(t, int32(1), atomic.LoadInt32(&tries)) + assert.Equal(t, int32(1), tries.Load()) } testFrontend(t, defaultFrontendConfig(), handler, test, false) - tries = 0 + tries.Store(0) testFrontend(t, defaultFrontendConfig(), handler, test, true) } @@ -234,7 +233,7 @@ func TestFrontendCheckReady(t *testing.T) { } { t.Run(tt.name, func(t *testing.T) { f := &Frontend{ - connectedClients: uber_atomic.NewInt32(tt.connectedClients), + connectedClients: atomic.NewInt32(tt.connectedClients), log: log.NewNopLogger(), cfg: Config{ DownstreamURL: tt.downstreamURL, diff --git a/pkg/querier/queryrange/retry_test.go b/pkg/querier/queryrange/retry_test.go index e7d1da92129..ca89639d8ea 100644 --- a/pkg/querier/queryrange/retry_test.go +++ b/pkg/querier/queryrange/retry_test.go @@ -5,16 +5,16 @@ import ( "errors" fmt "fmt" "net/http" - "sync/atomic" "testing" "github.com/go-kit/kit/log" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" + "go.uber.org/atomic" ) func TestRetry(t *testing.T) { - var try int32 + var try atomic.Int32 for _, tc := range []struct { name string @@ -25,7 +25,7 @@ func TestRetry(t *testing.T) { { name: "retry failures", handler: HandlerFunc(func(_ context.Context, req Request) (Response, error) { - if atomic.AddInt32(&try, 1) == 5 { + if try.Inc() == 5 { return &PrometheusResponse{Status: "Hello World"}, nil } return nil, fmt.Errorf("fail") @@ -49,7 +49,7 @@ func TestRetry(t *testing.T) { { name: "last error", handler: HandlerFunc(func(_ context.Context, req Request) (Response, error) { - if atomic.AddInt32(&try, 1) == 5 { + if try.Inc() == 5 { return nil, httpgrpc.Errorf(http.StatusBadRequest, "Bad Request") } return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Internal Server Error") @@ -58,7 +58,7 @@ func TestRetry(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - try = 0 + try.Store(0) h := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(tc.handler) resp, err := h.Do(context.Background(), nil) require.Equal(t, tc.err, err) @@ -68,26 +68,26 @@ func TestRetry(t *testing.T) { } func Test_RetryMiddlewareCancel(t *testing.T) { - var try int32 + var try atomic.Int32 ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap( HandlerFunc(func(c context.Context, r Request) (Response, error) { - atomic.AddInt32(&try, 1) + try.Inc() return nil, ctx.Err() }), ).Do(ctx, nil) - require.Equal(t, int32(0), try) + require.Equal(t, int32(0), try.Load()) require.Equal(t, ctx.Err(), err) ctx, cancel = context.WithCancel(context.Background()) _, err = NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap( HandlerFunc(func(c context.Context, r Request) (Response, error) { - atomic.AddInt32(&try, 1) + try.Inc() cancel() return nil, errors.New("failed") }), ).Do(ctx, nil) - require.Equal(t, int32(1), try) + require.Equal(t, int32(1), try.Load()) require.Equal(t, ctx.Err(), err) } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index 7ffa30e9ce3..40fd126d311 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -7,13 +7,13 @@ import ( "net/http/httptest" "net/url" "strconv" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "go.uber.org/atomic" ) const seconds = 1e3 // 1e3 milliseconds per second. @@ -261,11 +261,11 @@ func TestSplitByDay(t *testing.T) { } { t.Run(strconv.Itoa(i), func(t *testing.T) { - actualCount := int32(0) + var actualCount atomic.Int32 s := httptest.NewServer( middleware.AuthenticateUser.Wrap( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddInt32(&actualCount, 1) + actualCount.Inc() _, _ = w.Write([]byte(responseBody)) }), ), @@ -293,7 +293,7 @@ func TestSplitByDay(t *testing.T) { bs, err := ioutil.ReadAll(resp.Body) require.NoError(t, err) require.Equal(t, tc.expectedBody, string(bs)) - require.Equal(t, tc.expectedQueryCount, actualCount) + require.Equal(t, tc.expectedQueryCount, actualCount.Load()) }) } } diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index c5e6a00b559..89a24656aac 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "sync" - "sync/atomic" + + "go.uber.org/atomic" ) type batchTracker struct { - rpcsPending int32 - rpcsFailed int32 + rpcsPending atomic.Int32 + rpcsFailed atomic.Int32 done chan struct{} err chan error } @@ -23,8 +24,8 @@ type ingester struct { type itemTracker struct { minSuccess int maxFailures int - succeeded int32 - failed int32 + succeeded atomic.Int32 + failed atomic.Int32 } // DoBatch request against a set of keys in the ring, handling replication and @@ -70,10 +71,10 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges } tracker := batchTracker{ - rpcsPending: int32(len(itemTrackers)), - done: make(chan struct{}, 1), - err: make(chan error, 1), + done: make(chan struct{}, 1), + err: make(chan error, 1), } + tracker.rpcsPending.Store(int32(len(itemTrackers))) var wg sync.WaitGroup @@ -115,17 +116,17 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) { // goroutine will write to either channel. for i := range sampleTrackers { if err != nil { - if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) { + if sampleTrackers[i].failed.Inc() <= int32(sampleTrackers[i].maxFailures) { continue } - if atomic.AddInt32(&b.rpcsFailed, 1) == 1 { + if b.rpcsFailed.Inc() == 1 { b.err <- err } } else { - if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) { + if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) { continue } - if atomic.AddInt32(&b.rpcsPending, -1) == 0 { + if b.rpcsPending.Dec() == 0 { b.done <- struct{}{} } } diff --git a/pkg/ring/kv/memberlist/tcp_transport.go b/pkg/ring/kv/memberlist/tcp_transport.go index dc149078798..d6af8b2ec99 100644 --- a/pkg/ring/kv/memberlist/tcp_transport.go +++ b/pkg/ring/kv/memberlist/tcp_transport.go @@ -9,7 +9,6 @@ import ( "io/ioutil" "net" "sync" - "sync/atomic" "time" "github.com/go-kit/kit/log" @@ -17,6 +16,7 @@ import ( "github.com/hashicorp/go-sockaddr" "github.com/hashicorp/memberlist" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -79,7 +79,7 @@ type TCPTransport struct { wg sync.WaitGroup tcpListeners []*net.TCPListener - shutdown int32 + shutdown atomic.Int32 advertiseMu sync.RWMutex advertiseAddr string @@ -172,7 +172,7 @@ func (t *TCPTransport) tcpListen(tcpLn *net.TCPListener) { for { conn, err := tcpLn.AcceptTCP() if err != nil { - if s := atomic.LoadInt32(&t.shutdown); s == 1 { + if s := t.shutdown.Load(); s == 1 { break } @@ -503,7 +503,7 @@ func (t *TCPTransport) StreamCh() <-chan net.Conn { // transport a chance to clean up any listeners. func (t *TCPTransport) Shutdown() error { // This will avoid log spam about errors when we shut down. - atomic.StoreInt32(&t.shutdown, 1) + t.shutdown.Store(1) // Rip through all the connections and shut them down. for _, conn := range t.tcpListeners { diff --git a/pkg/storage/tsdb/ref_cache.go b/pkg/storage/tsdb/ref_cache.go index 0c5447c135a..27f625b24c7 100644 --- a/pkg/storage/tsdb/ref_cache.go +++ b/pkg/storage/tsdb/ref_cache.go @@ -2,11 +2,11 @@ package tsdb import ( "sync" - "sync/atomic" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" @@ -41,7 +41,7 @@ type refCacheStripe struct { type refCacheEntry struct { lbs labels.Labels ref uint64 - touchedAt int64 // Unix nano time. + touchedAt atomic.Int64 // Unix nano time. } // NewRefCache makes a new RefCache. @@ -93,7 +93,7 @@ func (s *refCacheStripe) ref(now time.Time, series labels.Labels, fp model.Finge for ix := range entries { if labels.Equal(entries[ix].lbs, series) { // Since we use read-only lock, we need to use atomic update. - atomic.StoreInt64(&entries[ix].touchedAt, now.UnixNano()) + entries[ix].touchedAt.Store(now.UnixNano()) return entries[ix].ref, true } } @@ -112,13 +112,15 @@ func (s *refCacheStripe) setRef(now time.Time, series labels.Labels, fp model.Fi } entry.ref = ref - entry.touchedAt = now.UnixNano() + entry.touchedAt.Store(now.UnixNano()) s.refs[fp][ix] = entry return } // The entry doesn't exist, so we have to add a new one. - s.refs[fp] = append(s.refs[fp], refCacheEntry{lbs: series, ref: ref, touchedAt: now.UnixNano()}) + refCacheEntry := refCacheEntry{lbs: series, ref: ref} + refCacheEntry.touchedAt.Store(now.UnixNano()) + s.refs[fp] = append(s.refs[fp], refCacheEntry) } func (s *refCacheStripe) purge(keepUntil time.Time) { @@ -131,7 +133,7 @@ func (s *refCacheStripe) purge(keepUntil time.Time) { // Since we do expect very few fingerprint collisions, we // have an optimized implementation for the common case. if len(entries) == 1 { - if entries[0].touchedAt < keepUntilNanos { + if entries[0].touchedAt.Load() < keepUntilNanos { delete(s.refs, fp) } @@ -141,7 +143,7 @@ func (s *refCacheStripe) purge(keepUntil time.Time) { // We have more entries, which means there's a collision, // so we have to iterate over the entries. for i := 0; i < len(entries); { - if entries[i].touchedAt < keepUntilNanos { + if entries[i].touchedAt.Load() < keepUntilNanos { entries = append(entries[:i], entries[i+1:]...) } else { i++ diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 23f598b6f71..b4679e7406c 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -6,7 +6,6 @@ import ( "os" "path/filepath" "strings" - "sync/atomic" "testing" "github.com/go-kit/kit/log" @@ -20,6 +19,7 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/logging" + "go.uber.org/atomic" "google.golang.org/grpc/metadata" "github.com/cortexproject/cortex/pkg/storage/backend/filesystem" @@ -190,15 +190,15 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { require.NoError(t, err) // Sync user stores and count the number of times the callback is called. - storesCount := int32(0) + var storesCount atomic.Int32 err = stores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { - atomic.AddInt32(&storesCount, 1) + storesCount.Inc() return nil }) assert.NoError(t, err) bucketClient.AssertNumberOfCalls(t, "Iter", 1) - assert.Equal(t, storesCount, int32(3)) + assert.Equal(t, storesCount.Load(), int32(3)) } func prepareStorageConfig(t *testing.T) (cortex_tsdb.BlocksStorageConfig, func()) { diff --git a/pkg/util/events.go b/pkg/util/events.go index ec96106be07..dba9ec30df3 100644 --- a/pkg/util/events.go +++ b/pkg/util/events.go @@ -2,9 +2,9 @@ package util import ( "os" - "sync/atomic" "github.com/go-kit/kit/log" + "go.uber.org/atomic" ) // Provide an "event" interface for observability @@ -43,11 +43,11 @@ func newEventLogger(freq int) log.Logger { type samplingFilter struct { next log.Logger freq int - count int64 + count atomic.Int64 } func (e *samplingFilter) Log(keyvals ...interface{}) error { - count := atomic.AddInt64(&e.count, 1) + count := e.count.Inc() if count%int64(e.freq) == 0 { return e.next.Log(keyvals...) } diff --git a/pkg/util/services/services_test.go b/pkg/util/services/services_test.go index 770bad2cc8c..431ac0c0c87 100644 --- a/pkg/util/services/services_test.go +++ b/pkg/util/services/services_test.go @@ -3,11 +3,11 @@ package services import ( "context" "errors" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "go.uber.org/atomic" ) func TestIdleService(t *testing.T) { @@ -41,10 +41,10 @@ func TestIdleService(t *testing.T) { func TestTimerService(t *testing.T) { t.Parallel() - iterations := int64(0) + var iterations atomic.Uint64 s := NewTimerService(100*time.Millisecond, nil, func(ctx context.Context) error { - atomic.AddInt64(&iterations, 1) + iterations.Inc() return nil }, nil) defer s.StopAsync() @@ -57,7 +57,7 @@ func TestTimerService(t *testing.T) { time.Sleep(1 * time.Second) - val := atomic.LoadInt64(&iterations) + val := iterations.Load() require.NotZero(t, val) // we should observe some iterations now s.StopAsync()