Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ lint:
golangci-lint run

# Ensure no blacklisted package is imported.
faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,golang.org/x/net/context=context" ./pkg/... ./cmd/... ./tools/... ./integration/...
faillint -paths "github.com/bmizerany/assert=github.com/stretchr/testify/assert,\
golang.org/x/net/context=context,\
sync/atomic=go.uber.org/atomic" ./pkg/... ./cmd/... ./tools/... ./integration/...

# Validate Kubernetes spec files. Requires:
# https://kubeval.instrumenta.dev
Expand Down
6 changes: 3 additions & 3 deletions pkg/chunk/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package cache_test
import (
"context"
"errors"
"sync/atomic"
"testing"

"github.com/bradfitz/gomemcache/memcache"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/chunk/cache"
)
Expand Down Expand Up @@ -71,7 +71,7 @@ func testMemcache(t *testing.T, memcache *cache.Memcached) {
// mockMemcache whose calls fail 1/3rd of the time.
type mockMemcacheFailing struct {
*mockMemcache
calls uint64
calls atomic.Uint64
}

func newMockMemcacheFailing() *mockMemcacheFailing {
Expand All @@ -81,7 +81,7 @@ func newMockMemcacheFailing() *mockMemcacheFailing {
}

func (c *mockMemcacheFailing) GetMulti(keys []string) (map[string]*memcache.Item, error) {
calls := atomic.AddUint64(&c.calls, 1)
calls := c.calls.Inc()
if calls%3 == 0 {
return nil, errors.New("fail")
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/ingester/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util"
)
Expand All @@ -24,8 +24,7 @@ type fpMappings map[model.Fingerprint]map[string]model.Fingerprint
// fpMapper is used to map fingerprints in order to work around fingerprint
// collisions.
type fpMapper struct {
// highestMappedFP has to be aligned for atomic operations.
highestMappedFP model.Fingerprint
highestMappedFP atomic.Uint64

mtx sync.RWMutex // Protects mappings.
mappings fpMappings
Expand Down Expand Up @@ -130,7 +129,7 @@ func (m *fpMapper) maybeAddMapping(
}

func (m *fpMapper) nextMappedFP() model.Fingerprint {
mappedFP := model.Fingerprint(atomic.AddUint64((*uint64)(&m.highestMappedFP), 1))
mappedFP := model.Fingerprint(m.highestMappedFP.Inc())
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/ingester/rate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package ingester

import (
"sync"
"sync/atomic"
"time"

"go.uber.org/atomic"
)

// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
type ewmaRate struct {
newEvents int64
newEvents atomic.Int64
alpha float64
interval time.Duration
lastRate float64
Expand All @@ -32,8 +33,8 @@ func (r *ewmaRate) rate() float64 {

// tick assumes to be called every r.interval.
func (r *ewmaRate) tick() {
newEvents := atomic.LoadInt64(&r.newEvents)
atomic.AddInt64(&r.newEvents, -newEvents)
newEvents := r.newEvents.Load()
r.newEvents.Sub(newEvents)
instantRate := float64(newEvents) / r.interval.Seconds()

r.mutex.Lock()
Expand All @@ -49,9 +50,9 @@ func (r *ewmaRate) tick() {

// inc counts one event.
func (r *ewmaRate) inc() {
atomic.AddInt64(&r.newEvents, 1)
r.newEvents.Inc()
}

func (r *ewmaRate) add(delta int64) {
atomic.AddInt64(&r.newEvents, delta)
r.newEvents.Add(delta)
}
10 changes: 5 additions & 5 deletions pkg/ingester/series_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package ingester

import (
"sync"
"sync/atomic"
"unsafe"

"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/util"
)
Expand All @@ -16,7 +16,7 @@ const seriesMapShards = 128
// goroutine-safe. A seriesMap is effectively a goroutine-safe version of
// map[model.Fingerprint]*memorySeries.
type seriesMap struct {
size int32
size atomic.Int32
shards []shard
}

Expand Down Expand Up @@ -65,7 +65,7 @@ func (sm *seriesMap) put(fp model.Fingerprint, s *memorySeries) {
shard.mtx.Unlock()

if !ok {
atomic.AddInt32(&sm.size, 1)
sm.size.Inc()
}
}

Expand All @@ -77,7 +77,7 @@ func (sm *seriesMap) del(fp model.Fingerprint) {
delete(shard.m, fp)
shard.mtx.Unlock()
if ok {
atomic.AddInt32(&sm.size, -1)
sm.size.Dec()
}
}

Expand Down Expand Up @@ -106,5 +106,5 @@ func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
}

func (sm *seriesMap) length() int {
return int(atomic.LoadInt32(&sm.size))
return int(sm.size.Load())
}
13 changes: 6 additions & 7 deletions pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

Expand All @@ -25,7 +24,7 @@ import (
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
uber_atomic "go.uber.org/atomic"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/querier"
Expand Down Expand Up @@ -172,10 +171,10 @@ func TestFrontend_RequestHostHeaderWhenDownstreamURLIsConfigured(t *testing.T) {
// TestFrontendCancel ensures that when client requests are cancelled,
// the underlying query is correctly cancelled _and not retried_.
func TestFrontendCancel(t *testing.T) {
var tries int32
var tries atomic.Int32
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-r.Context().Done()
atomic.AddInt32(&tries, 1)
tries.Inc()
})
test := func(addr string) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
Expand All @@ -195,10 +194,10 @@ func TestFrontendCancel(t *testing.T) {
require.Error(t, err)

time.Sleep(100 * time.Millisecond)
assert.Equal(t, int32(1), atomic.LoadInt32(&tries))
assert.Equal(t, int32(1), tries.Load())
}
testFrontend(t, defaultFrontendConfig(), handler, test, false)
tries = 0
tries.Store(0)
testFrontend(t, defaultFrontendConfig(), handler, test, true)
}

Expand Down Expand Up @@ -234,7 +233,7 @@ func TestFrontendCheckReady(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
connectedClients: uber_atomic.NewInt32(tt.connectedClients),
connectedClients: atomic.NewInt32(tt.connectedClients),
log: log.NewNopLogger(),
cfg: Config{
DownstreamURL: tt.downstreamURL,
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/queryrange/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import (
"errors"
fmt "fmt"
"net/http"
"sync/atomic"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"go.uber.org/atomic"
)

func TestRetry(t *testing.T) {
var try int32
var try atomic.Int32

for _, tc := range []struct {
name string
Expand All @@ -25,7 +25,7 @@ func TestRetry(t *testing.T) {
{
name: "retry failures",
handler: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
if atomic.AddInt32(&try, 1) == 5 {
if try.Inc() == 5 {
return &PrometheusResponse{Status: "Hello World"}, nil
}
return nil, fmt.Errorf("fail")
Expand All @@ -49,7 +49,7 @@ func TestRetry(t *testing.T) {
{
name: "last error",
handler: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
if atomic.AddInt32(&try, 1) == 5 {
if try.Inc() == 5 {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "Bad Request")
}
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "Internal Server Error")
Expand All @@ -58,7 +58,7 @@ func TestRetry(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
try = 0
try.Store(0)
h := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(tc.handler)
resp, err := h.Do(context.Background(), nil)
require.Equal(t, tc.err, err)
Expand All @@ -68,26 +68,26 @@ func TestRetry(t *testing.T) {
}

func Test_RetryMiddlewareCancel(t *testing.T) {
var try int32
var try atomic.Int32
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, err := NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(
HandlerFunc(func(c context.Context, r Request) (Response, error) {
atomic.AddInt32(&try, 1)
try.Inc()
return nil, ctx.Err()
}),
).Do(ctx, nil)
require.Equal(t, int32(0), try)
require.Equal(t, int32(0), try.Load())
require.Equal(t, ctx.Err(), err)

ctx, cancel = context.WithCancel(context.Background())
_, err = NewRetryMiddleware(log.NewNopLogger(), 5, nil).Wrap(
HandlerFunc(func(c context.Context, r Request) (Response, error) {
atomic.AddInt32(&try, 1)
try.Inc()
cancel()
return nil, errors.New("failed")
}),
).Do(ctx, nil)
require.Equal(t, int32(1), try)
require.Equal(t, int32(1), try.Load())
require.Equal(t, ctx.Err(), err)
}
8 changes: 4 additions & 4 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
"net/http/httptest"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
)

const seconds = 1e3 // 1e3 milliseconds per second.
Expand Down Expand Up @@ -261,11 +261,11 @@ func TestSplitByDay(t *testing.T) {
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {

actualCount := int32(0)
var actualCount atomic.Int32
s := httptest.NewServer(
middleware.AuthenticateUser.Wrap(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&actualCount, 1)
actualCount.Inc()
_, _ = w.Write([]byte(responseBody))
}),
),
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestSplitByDay(t *testing.T) {
bs, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, tc.expectedBody, string(bs))
require.Equal(t, tc.expectedQueryCount, actualCount)
require.Equal(t, tc.expectedQueryCount, actualCount.Load())
})
}
}
25 changes: 13 additions & 12 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"

"go.uber.org/atomic"
)

type batchTracker struct {
rpcsPending int32
rpcsFailed int32
rpcsPending atomic.Int32
rpcsFailed atomic.Int32
done chan struct{}
err chan error
}
Expand All @@ -23,8 +24,8 @@ type ingester struct {
type itemTracker struct {
minSuccess int
maxFailures int
succeeded int32
failed int32
succeeded atomic.Int32
failed atomic.Int32
}

// DoBatch request against a set of keys in the ring, handling replication and
Expand Down Expand Up @@ -70,10 +71,10 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges
}

tracker := batchTracker{
rpcsPending: int32(len(itemTrackers)),
done: make(chan struct{}, 1),
err: make(chan error, 1),
done: make(chan struct{}, 1),
err: make(chan error, 1),
}
tracker.rpcsPending.Store(int32(len(itemTrackers)))

var wg sync.WaitGroup

Expand Down Expand Up @@ -115,17 +116,17 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
// goroutine will write to either channel.
for i := range sampleTrackers {
if err != nil {
if atomic.AddInt32(&sampleTrackers[i].failed, 1) <= int32(sampleTrackers[i].maxFailures) {
if sampleTrackers[i].failed.Inc() <= int32(sampleTrackers[i].maxFailures) {
continue
}
if atomic.AddInt32(&b.rpcsFailed, 1) == 1 {
if b.rpcsFailed.Inc() == 1 {
b.err <- err
}
} else {
if atomic.AddInt32(&sampleTrackers[i].succeeded, 1) != int32(sampleTrackers[i].minSuccess) {
if sampleTrackers[i].succeeded.Inc() != int32(sampleTrackers[i].minSuccess) {
continue
}
if atomic.AddInt32(&b.rpcsPending, -1) == 0 {
if b.rpcsPending.Dec() == 0 {
b.done <- struct{}{}
}
}
Expand Down
Loading