diff --git a/catchup/service.go b/catchup/service.go index bc23b3d736..328d6e5828 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -70,10 +70,10 @@ type Ledger interface { // Service represents the catchup service. Once started and until it is stopped, it ensures that the ledger is up to date with network. type Service struct { - syncStartNS int64 // at top of struct to keep 64 bit aligned for atomic.* ops // disableSyncRound, provided externally, is the first round we will _not_ fetch from the network // any round >= disableSyncRound will not be fetched. If set to 0, it will be disregarded. - disableSyncRound uint64 + disableSyncRound atomic.Uint64 + syncStartNS atomic.Int64 cfg config.Local ledger Ledger ctx context.Context @@ -94,7 +94,7 @@ type Service struct { // The channel gets closed when the initial sync is complete. This allows for other services to avoid // the overhead of starting prematurely (before this node is caught-up and can validate messages for example). InitialSyncDone chan struct{} - initialSyncNotified uint32 + initialSyncNotified atomic.Uint32 protocolErrorLogged bool unmatchedPendingCertificates <-chan PendingUnmatchedCertificate // This channel signals periodSync to attempt catchup immediately. This allows us to start fetching rounds from @@ -140,7 +140,7 @@ func MakeService(log logging.Logger, config config.Local, net network.GossipNode // Start the catchup service func (s *Service) Start() { s.ctx, s.cancel = context.WithCancel(context.Background()) - atomic.StoreUint32(&s.initialSyncNotified, 0) + s.initialSyncNotified.Store(0) s.InitialSyncDone = make(chan struct{}) s.workers.Add(1) go s.periodicSync() @@ -150,7 +150,7 @@ func (s *Service) Start() { func (s *Service) Stop() { s.cancel() s.workers.Wait() - if atomic.CompareAndSwapUint32(&s.initialSyncNotified, 0, 1) { + if s.initialSyncNotified.CompareAndSwap(0, 1) { close(s.InitialSyncDone) } } @@ -159,8 +159,8 @@ func (s *Service) Stop() { // or attempting to catchup after too-long waiting for next block. // Also returns a 2nd bool indicating if this is our initial sync func (s *Service) IsSynchronizing() (synchronizing bool, initialSync bool) { - synchronizing = atomic.LoadInt64(&s.syncStartNS) != 0 - initialSync = atomic.LoadUint32(&s.initialSyncNotified) == 0 + synchronizing = s.syncStartNS.Load() != 0 + initialSync = s.initialSyncNotified.Load() == 0 return } @@ -180,25 +180,25 @@ func (s *Service) SetDisableSyncRound(rnd uint64) error { if basics.Round(rnd) < s.ledger.LastRound() { return ErrSyncRoundInvalid } - atomic.StoreUint64(&s.disableSyncRound, rnd) + s.disableSyncRound.Store(rnd) s.triggerSync() return nil } // UnsetDisableSyncRound removes any previously set disabled sync round func (s *Service) UnsetDisableSyncRound() { - atomic.StoreUint64(&s.disableSyncRound, 0) + s.disableSyncRound.Store(0) s.triggerSync() } // GetDisableSyncRound returns the disabled sync round func (s *Service) GetDisableSyncRound() uint64 { - return atomic.LoadUint64(&s.disableSyncRound) + return s.disableSyncRound.Load() } // SynchronizingTime returns the time we've been performing a catchup operation (0 if not currently catching up) func (s *Service) SynchronizingTime() time.Duration { - startNS := atomic.LoadInt64(&s.syncStartNS) + startNS := s.syncStartNS.Load() if startNS == 0 { return time.Duration(0) } @@ -608,8 +608,8 @@ func (s *Service) sync() { start := time.Now() timeInNS := start.UnixNano() - if !atomic.CompareAndSwapInt64(&s.syncStartNS, 0, timeInNS) { - s.log.Infof("resuming previous sync from %d (now=%d)", atomic.LoadInt64(&s.syncStartNS), timeInNS) + if !s.syncStartNS.CompareAndSwap(0, timeInNS) { + s.log.Infof("resuming previous sync from %d (now=%d)", s.syncStartNS.Load(), timeInNS) } pr := s.ledger.LastRound() @@ -632,10 +632,10 @@ func (s *Service) sync() { // if the catchupWriting flag is set, it means that we aborted the sync due to the ledger writing the catchup file. if !s.suspendForCatchpointWriting { // in that case, don't change the timer so that the "timer" would keep running. - atomic.StoreInt64(&s.syncStartNS, 0) + s.syncStartNS.Store(0) // close the initial sync channel if not already close - if atomic.CompareAndSwapUint32(&s.initialSyncNotified, 0, 1) { + if s.initialSyncNotified.CompareAndSwap(0, 1) { close(s.InitialSyncDone) initSync = true } diff --git a/catchup/service_test.go b/catchup/service_test.go index 6807b11194..374ed8cc57 100644 --- a/catchup/service_test.go +++ b/catchup/service_test.go @@ -21,7 +21,6 @@ import ( "errors" "math/rand" "sync" - "sync/atomic" "testing" "time" @@ -1085,7 +1084,7 @@ func TestSynchronizingTime(t *testing.T) { s := MakeService(logging.Base(), cfg, &httpTestPeerSource{}, ledger, &mockedAuthenticator{errorRound: int(0 + 1)}, nil, nil) require.Equal(t, time.Duration(0), s.SynchronizingTime()) - atomic.StoreInt64(&s.syncStartNS, 1000000) + s.syncStartNS.Store(1000000) require.NotEqual(t, time.Duration(0), s.SynchronizingTime()) } diff --git a/cmd/tealdbg/cdtSession.go b/cmd/tealdbg/cdtSession.go index f7b74eb9ec..5d0bdd08be 100644 --- a/cmd/tealdbg/cdtSession.go +++ b/cmd/tealdbg/cdtSession.go @@ -51,8 +51,8 @@ type cdtSession struct { verbose bool } -var contextCounter int32 = 0 -var scriptCounter int32 = 0 +var contextCounter atomic.Int32 +var scriptCounter atomic.Int32 func makeCdtSession(uuid string, debugger Control, ch chan Notification) *cdtSession { s := new(cdtSession) @@ -60,8 +60,8 @@ func makeCdtSession(uuid string, debugger Control, ch chan Notification) *cdtSes s.debugger = debugger s.notifications = ch s.done = make(chan struct{}) - s.contextID = int(atomic.AddInt32(&contextCounter, 1)) - s.scriptID = strconv.Itoa(int(atomic.AddInt32(&scriptCounter, 1))) + s.contextID = int(contextCounter.Add(1)) + s.scriptID = strconv.Itoa(int(scriptCounter.Add(1))) return s } diff --git a/cmd/tealdbg/cdtSession_test.go b/cmd/tealdbg/cdtSession_test.go index e4cae925cd..186e0d7df4 100644 --- a/cmd/tealdbg/cdtSession_test.go +++ b/cmd/tealdbg/cdtSession_test.go @@ -521,9 +521,7 @@ func TestCdtSessionGetObjects(t *testing.T) { {Type: basics.TealUintType, Uint: 1}, {Type: basics.TealBytesType, Bytes: "\x01\x02"}, }, - pc: atomicInt{1}, - line: atomicInt{1}, - err: e, + err: e, AppState: AppState{ appIdx: basics.AppIndex(1), schemas: basics.StateSchemas{ @@ -582,6 +580,8 @@ func TestCdtSessionGetObjects(t *testing.T) { }, }, } + state.pc.Store(1) + state.line.Store(1) req.Method = "Runtime.getProperties" req.Params = map[string]interface{}{} diff --git a/cmd/tealdbg/util.go b/cmd/tealdbg/util.go index d91220e71c..971a08223a 100644 --- a/cmd/tealdbg/util.go +++ b/cmd/tealdbg/util.go @@ -44,35 +44,31 @@ func (s *atomicString) Length() int { } type atomicBool struct { - value uint32 + value atomic.Bool } func (b *atomicBool) SetTo(other bool) { - var converted uint32 = 0 - if other { - converted = 1 - } - atomic.StoreUint32(&b.value, converted) + b.value.Store(other) } func (b *atomicBool) IsSet() bool { - return atomic.LoadUint32(&b.value) != 0 + return b.value.Load() } type atomicInt struct { - value int32 + value atomic.Int32 } func (i *atomicInt) Store(other int) { - atomic.StoreInt32(&i.value, int32(other)) + i.value.Store(int32(other)) } func (i *atomicInt) Load() int { - return int(atomic.LoadInt32(&i.value)) + return int(i.value.Load()) } func (i *atomicInt) Add(other int) int { - return int(atomic.AddInt32(&i.value, int32(other))) + return int(i.value.Add(int32(other))) } // IsText checks if the input has all printable characters with strconv.IsPrint diff --git a/crypto/merklearray/worker.go b/crypto/merklearray/worker.go index 2a8059336f..b6d273e37b 100644 --- a/crypto/merklearray/worker.go +++ b/crypto/merklearray/worker.go @@ -28,7 +28,7 @@ type workerState struct { // maxidx is the total number of elements to process, and nextidx // is the next element that a worker should process. maxidx uint64 - nextidx uint64 + nextidx atomic.Uint64 // nworkers is the number of workers that can be started. // This field gets decremented once workers are launched, @@ -65,7 +65,7 @@ func newWorkerState(max uint64) *workerState { // by delta. This implicitly means that the worker that calls next // is promising to process delta elements at the returned position. func (ws *workerState) next(delta uint64) uint64 { - return atomic.AddUint64(&ws.nextidx, delta) - delta + return ws.nextidx.Add(delta) - delta } // wait waits for all of the workers to finish. @@ -82,7 +82,7 @@ func (ws *workerState) nextWorker() bool { _ = <-ws.starting - curidx := atomic.LoadUint64(&ws.nextidx) + curidx := ws.nextidx.Load() if curidx >= ws.maxidx { return false } diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index a03baea4f0..47deae4d32 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -49,9 +49,6 @@ import ( // TransactionPool.AssembleBlock constructs a valid block for // proposal given a deadline. type TransactionPool struct { - // feePerByte is stored at the beginning of this struct to ensure it has a 64 bit aligned address. This is needed as it's being used - // with atomic operations which require 64 bit alignment on arm. - feePerByte uint64 // const logProcessBlockStats bool @@ -65,6 +62,7 @@ type TransactionPool struct { expiredTxCount map[basics.Round]int pendingBlockEvaluator BlockEvaluator numPendingWholeBlocks basics.Round + feePerByte atomic.Uint64 feeThresholdMultiplier uint64 statusCache *statusCache @@ -295,7 +293,7 @@ func (pool *TransactionPool) checkPendingQueueSize(txnGroup []transactions.Signe // FeePerByte returns the current minimum microalgos per byte a transaction // needs to pay in order to get into the pool. func (pool *TransactionPool) FeePerByte() uint64 { - return atomic.LoadUint64(&pool.feePerByte) + return pool.feePerByte.Load() } // computeFeePerByte computes and returns the current minimum microalgos per byte a transaction @@ -332,7 +330,7 @@ func (pool *TransactionPool) computeFeePerByte() uint64 { } // Update the counter for fast reads - atomic.StoreUint64(&pool.feePerByte, feePerByte) + pool.feePerByte.Store(feePerByte) return feePerByte } diff --git a/ledger/bulletin.go b/ledger/bulletin.go index 8114fefa14..039fb3376e 100644 --- a/ledger/bulletin.go +++ b/ledger/bulletin.go @@ -31,17 +31,17 @@ import ( // notifier is a struct that encapsulates a single-shot channel; it will only be signaled once. type notifier struct { signal chan struct{} - notified uint32 + notified *atomic.Uint32 } // makeNotifier constructs a notifier that has not been signaled. func makeNotifier() notifier { - return notifier{signal: make(chan struct{}), notified: 0} + return notifier{signal: make(chan struct{}), notified: &atomic.Uint32{}} } // notify signals the channel if it hasn't already done so func (notifier *notifier) notify() { - if atomic.CompareAndSwapUint32(¬ifier.notified, 0, 1) { + if notifier.notified.CompareAndSwap(0, 1) { close(notifier.signal) } } diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index d3bf1f87ea..dd576fc41f 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -129,7 +129,7 @@ type catchpointTracker struct { // catchpointDataWriting helps to synchronize the (first stage) catchpoint data file // writing. When this atomic variable is 0, no writing is going on. // Any non-zero value indicates a catchpoint being written, or scheduled to be written. - catchpointDataWriting int32 + catchpointDataWriting atomic.Int32 // The Trie tracking the current account balances. Always matches the balances that were // written to the database. @@ -233,7 +233,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic catchpointGenerationStats.BalancesWriteTime = uint64(updatingBalancesDuration.Nanoseconds()) totalKVs, totalAccounts, totalChunks, biggestChunkLen, err = ct.generateCatchpointData( ctx, dbRound, &catchpointGenerationStats, spVerificationEncodedData) - atomic.StoreInt32(&ct.catchpointDataWriting, 0) + ct.catchpointDataWriting.Store(0) if err != nil { return err } @@ -347,7 +347,7 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou } ct.roundDigest = nil - ct.catchpointDataWriting = 0 + ct.catchpointDataWriting.Store(0) // keep these channel closed if we're not generating catchpoint ct.catchpointDataSlowWriting = make(chan struct{}, 1) close(ct.catchpointDataSlowWriting) @@ -500,7 +500,7 @@ func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error { if ct.enableGeneratingCatchpointFiles && dcc.catchpointFirstStage { // store non-zero ( all ones ) into the catchpointWriting atomic variable to indicate that a catchpoint is being written - atomic.StoreInt32(&ct.catchpointDataWriting, int32(-1)) + ct.catchpointDataWriting.Store(int32(-1)) } dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset) @@ -516,7 +516,7 @@ func (ct *catchpointTracker) commitRound(ctx context.Context, tx trackerdb.Trans defer func() { if err != nil && dcc.catchpointFirstStage && ct.enableGeneratingCatchpointFiles { - atomic.StoreInt32(&ct.catchpointDataWriting, 0) + ct.catchpointDataWriting.Store(0) } }() @@ -963,7 +963,7 @@ func (ct *catchpointTracker) cancelWrite(dcc *deferredCommitContext) { // determine if this was a catchpoint round if dcc.catchpointFirstStage { // it was a catchpoint round, so update the catchpointWriting to indicate that we're done. - atomic.StoreInt32(&ct.catchpointDataWriting, 0) + ct.catchpointDataWriting.Store(0) } } } @@ -1117,7 +1117,7 @@ func (ct *catchpointTracker) accountsUpdateBalances(accountsDeltas compactAccoun // isWritingCatchpointDataFile returns true iff a (first stage) catchpoint data file // is being generated. func (ct *catchpointTracker) isWritingCatchpointDataFile() bool { - return atomic.LoadInt32(&ct.catchpointDataWriting) != 0 + return ct.catchpointDataWriting.Load() != 0 } // Generates a (first stage) catchpoint data file. diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 4e14a1ab1d..4516a52e2c 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -53,11 +53,11 @@ func TestCatchpointIsWritingCatchpointFile(t *testing.T) { ct := &catchpointTracker{} - ct.catchpointDataWriting = -1 + ct.catchpointDataWriting.Store(-1) ans := ct.isWritingCatchpointDataFile() require.True(t, ans) - ct.catchpointDataWriting = 0 + ct.catchpointDataWriting.Store(0) ans = ct.isWritingCatchpointDataFile() require.False(t, ans) } @@ -762,7 +762,7 @@ func TestCatchpointReproducibleLabels(t *testing.T) { require.NotZero(t, len(ct.roundDigest)) require.NoError(t, ct.loadFromDisk(ml, ml.Latest())) require.Zero(t, len(ct.roundDigest)) - require.Zero(t, ct.catchpointDataWriting) + require.Zero(t, ct.catchpointDataWriting.Load()) select { case _, closed := <-ct.catchpointDataSlowWriting: require.False(t, closed) @@ -777,7 +777,7 @@ type blockingTracker struct { postCommitUnlockedReleaseLock chan struct{} postCommitEntryLock chan struct{} postCommitReleaseLock chan struct{} - committedUpToRound int64 + committedUpToRound atomic.Int64 alwaysLock atomic.Bool shouldLockPostCommit atomic.Bool shouldLockPostCommitUnlocked atomic.Bool @@ -794,7 +794,7 @@ func (bt *blockingTracker) newBlock(blk bookkeeping.Block, delta ledgercore.Stat // committedUpTo in the blockingTracker just stores the committed round. func (bt *blockingTracker) committedUpTo(committedRnd basics.Round) (minRound, lookback basics.Round) { - atomic.StoreInt64(&bt.committedUpToRound, int64(committedRnd)) + bt.committedUpToRound.Store(int64(committedRnd)) return committedRnd, basics.Round(0) } @@ -906,7 +906,7 @@ func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { require.NoError(t, err) // wait for the committedUpToRound to be called with the correct round number. for { - committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound) + committedUpToRound := writeStallingTracker.committedUpToRound.Load() if basics.Round(committedUpToRound) == ledger.Latest() { break } @@ -948,7 +948,7 @@ func TestCatchpointTrackerNonblockingCatchpointWriting(t *testing.T) { require.NoError(t, err) // wait for the committedUpToRound to be called with the correct round number. for { - committedUpToRound := atomic.LoadInt64(&writeStallingTracker.committedUpToRound) + committedUpToRound := writeStallingTracker.committedUpToRound.Load() if basics.Round(committedUpToRound) == ledger.Latest() { break } diff --git a/ledger/eval/prefetcher/prefetcher.go b/ledger/eval/prefetcher/prefetcher.go index 40702b0088..2187fa1b5c 100644 --- a/ledger/eval/prefetcher/prefetcher.go +++ b/ledger/eval/prefetcher/prefetcher.go @@ -102,13 +102,10 @@ func PrefetchAccounts(ctx context.Context, l Ledger, rnd basics.Round, txnGroups type groupTask struct { // incompleteCount is the number of resources+balances still pending and need to be loaded // this variable is used by as atomic variable to synchronize the readiness of the group taks. - // in order to ensure support on 32-bit platforms, this variable need to be 64-bit aligned. - incompleteCount int64 + incompleteCount atomic.Int64 // the group task index - aligns with the index of the transaction group in the - // provided groups slice. The usage of int64 here is to made sure the size of the - // structure is 64-bit aligned. If this not the case, then it would fail the atomic - // operations on the incompleteCount on 32-bit systems. - groupTaskIndex int64 + // provided groups slice. + groupTaskIndex atomic.Int64 // balances contains the loaded balances each transaction group have balances []LoadedAccountDataEntry // balancesCount is the number of balances that nees to be loaded per transaction group @@ -385,21 +382,22 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { const dependencyFreeGroup = -int64(^uint64(0)/2) - 1 for grpIdx := range groupsReady { gr := groupsReady[grpIdx] - gr.groupTaskIndex = int64(grpIdx) - gr.incompleteCount = int64(gr.balancesCount + gr.resourcesCount) + gr.groupTaskIndex.Store(int64(grpIdx)) + gr.incompleteCount.Store(int64(gr.balancesCount + gr.resourcesCount)) gr.balances = allBalances[usedBalances : usedBalances+gr.balancesCount] if gr.resourcesCount > 0 { gr.resources = allResources[usedResources : usedResources+gr.resourcesCount] usedResources += gr.resourcesCount } usedBalances += gr.balancesCount - if gr.incompleteCount == 0 { - gr.incompleteCount = dependencyFreeGroup + if gr.incompleteCount.Load() == 0 { + gr.incompleteCount.Store(dependencyFreeGroup) } } - taskIdx := int64(-1) - defer atomic.StoreInt64(&taskIdx, tasksCount) + var taskIdx atomic.Int64 + taskIdx.Store(-1) + defer taskIdx.Store(tasksCount) // create few go-routines to load asyncroniously the account data. for i := 0; i < asyncAccountLoadingThreadCount; i++ { go p.asyncPrefetchRoutine(&tasksQueue, &taskIdx, groupDoneCh) @@ -409,7 +407,7 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { completed := make(map[int64]bool) for i := int64(0); i < int64(len(p.txnGroups)); { wait: - incompleteCount := atomic.LoadInt64(&groupsReady[i].incompleteCount) + incompleteCount := groupsReady[i].incompleteCount.Load() if incompleteCount > 0 || (incompleteCount != dependencyFreeGroup && !completed[i]) { select { case done := <-groupDoneCh: @@ -462,27 +460,27 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { func (gt *groupTask) markCompletionAcct(idx int, br LoadedAccountDataEntry, groupDoneCh chan groupTaskDone) { gt.balances[idx] = br - if atomic.AddInt64(>.incompleteCount, -1) == 0 { - groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex} + if gt.incompleteCount.Add(-1) == 0 { + groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex.Load()} } } func (gt *groupTask) markCompletionResource(idx int, res LoadedResourcesEntry, groupDoneCh chan groupTaskDone) { gt.resources[idx] = res - if atomic.AddInt64(>.incompleteCount, -1) == 0 { - groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex} + if gt.incompleteCount.Add(-1) == 0 { + groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex.Load()} } } func (gt *groupTask) markCompletionAcctError(err error, task *preloaderTask, groupDoneCh chan groupTaskDone) { for { - curVal := atomic.LoadInt64(>.incompleteCount) + curVal := gt.incompleteCount.Load() if curVal <= 0 { return } - if atomic.CompareAndSwapInt64(>.incompleteCount, curVal, 0) { + if gt.incompleteCount.CompareAndSwap(curVal, 0) { groupDoneCh <- groupTaskDone{ - groupIdx: gt.groupTaskIndex, + groupIdx: gt.groupTaskIndex.Load(), err: err, task: task, } @@ -491,11 +489,11 @@ func (gt *groupTask) markCompletionAcctError(err error, task *preloaderTask, gro } } -func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, taskIdx *int64, groupDoneCh chan groupTaskDone) { +func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, taskIdx *atomic.Int64, groupDoneCh chan groupTaskDone) { var task *preloaderTask var err error for { - nextTaskIdx := atomic.AddInt64(taskIdx, 1) + nextTaskIdx := taskIdx.Add(1) queue, task = queue.getTaskAtIndex(int(nextTaskIdx)) if task == nil { // no more tasks. diff --git a/network/netidentity.go b/network/netidentity.go index 6414c5e897..940ea0a633 100644 --- a/network/netidentity.go +++ b/network/netidentity.go @@ -20,7 +20,6 @@ import ( "encoding/base64" "fmt" "net/http" - "sync/atomic" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/protocol" @@ -329,7 +328,7 @@ func identityVerificationHandler(message IncomingMessage) OutgoingMessage { peer := message.Sender.(*wsPeer) // avoid doing work (crypto and potentially taking a lock) if the peer is already verified - if atomic.LoadUint32(&peer.identityVerified) == 1 { + if peer.identityVerified.Load() == 1 { return OutgoingMessage{} } localAddr, _ := peer.net.Address() @@ -350,7 +349,7 @@ func identityVerificationHandler(message IncomingMessage) OutgoingMessage { peer.log.With("remote", peer.OriginAddress()).With("local", localAddr).Warn("peer identity verification is incorrectly signed, disconnecting") return OutgoingMessage{Action: Disconnect, reason: disconnectBadIdentityData} } - atomic.StoreUint32(&peer.identityVerified, 1) + peer.identityVerified.Store(1) // if the identity could not be claimed by this peer, it means the identity is in use wn.peersLock.Lock() ok := wn.identityTracker.setIdentity(peer) diff --git a/network/netidentity_test.go b/network/netidentity_test.go index 13731aaaeb..9222da4600 100644 --- a/network/netidentity_test.go +++ b/network/netidentity_test.go @@ -358,7 +358,8 @@ func TestIdentityTrackerSetIdentity(t *testing.T) { // Just tests that if a peer is already verified, it just returns OutgoingMessage{} func TestIdentityTrackerHandlerGuard(t *testing.T) { partitiontest.PartitionTest(t) - p := wsPeer{identityVerified: uint32(1)} + p := wsPeer{} + p.identityVerified.Store(1) msg := IncomingMessage{ Sender: &p, Net: &WebsocketNetwork{}, diff --git a/network/netprio.go b/network/netprio.go index 378bea4c05..5cb122c11f 100644 --- a/network/netprio.go +++ b/network/netprio.go @@ -18,7 +18,6 @@ package network import ( "container/heap" - "sync/atomic" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/protocol" @@ -126,7 +125,7 @@ func (pt *prioTracker) setPriority(peer *wsPeer, addr basics.Address, weight uin peer.prioAddress = addr peer.prioWeight = weight heap.Fix(peersHeap{wn}, peer.peerIndex) - atomic.AddInt32(&wn.peersChangeCounter, 1) + wn.peersChangeCounter.Add(1) } func (pt *prioTracker) removePeer(peer *wsPeer) { diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index f36b0d3280..71188b2566 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -60,12 +60,12 @@ type P2PNetwork struct { broadcaster msgBroadcaster wsPeers map[peer.ID]*wsPeer wsPeersLock deadlock.RWMutex - wsPeersChangeCounter int32 + wsPeersChangeCounter atomic.Int32 wsPeersConnectivityCheckTicker *time.Ticker } type p2pPeerStats struct { - txReceived uint64 + txReceived atomic.Uint64 } // NewP2PNetwork returns an instance of GossipNode that uses the p2p.Service @@ -366,7 +366,7 @@ func (n *P2PNetwork) wsStreamHandler(ctx context.Context, peer peer.ID, stream n n.wsPeersLock.Lock() n.wsPeers[peer] = wsp n.wsPeersLock.Unlock() - atomic.AddInt32(&n.wsPeersChangeCounter, 1) + n.wsPeersChangeCounter.Add(1) } // peerRemoteClose called from wsPeer to report that it has closed @@ -375,7 +375,7 @@ func (n *P2PNetwork) peerRemoteClose(peer *wsPeer, reason disconnectReason) { n.wsPeersLock.Lock() delete(n.wsPeers, remotePeerID) n.wsPeersLock.Unlock() - atomic.AddInt32(&n.wsPeersChangeCounter, 1) + n.wsPeersChangeCounter.Add(1) } func (n *P2PNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { @@ -403,7 +403,7 @@ func (n *P2PNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { } func (n *P2PNetwork) getPeersChangeCounter() int32 { - return atomic.LoadInt32(&n.wsPeersChangeCounter) + return n.wsPeersChangeCounter.Load() } func (n *P2PNetwork) checkSlowWritingPeers() {} @@ -453,10 +453,10 @@ func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg * n.peerStatsMu.Lock() peerStats, ok := n.peerStats[peerID] if !ok { - n.peerStats[peerID] = &p2pPeerStats{txReceived: 1} - } else { - peerStats.txReceived++ + peerStats = &p2pPeerStats{} + n.peerStats[peerID] = peerStats } + peerStats.txReceived.Add(1) n.peerStatsMu.Unlock() outmsg := n.handler.Handle(inmsg) diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 585d9aaa17..3b6d127596 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -104,7 +104,7 @@ func TestP2PSubmitTX(t *testing.T) { if !ok { return false } - return atomic.LoadUint64(&netCpeerStatsA.txReceived) == 10 + return netCpeerStatsA.txReceived.Load() == 10 }, 1*time.Second, 50*time.Millisecond, @@ -153,12 +153,12 @@ func TestP2PSubmitWS(t *testing.T) { // now we should be connected in a line: B <-> A <-> C where both B and C are connected to A but not each other testTag := protocol.AgreementVoteTag - var handlerCount uint32 + var handlerCount atomic.Uint32 // Since we aren't using the transaction handler in this test, we need to register a pass-through handler passThroughHandler := []TaggedMessageHandler{ {Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage { - atomic.AddUint32(&handlerCount, 1) + handlerCount.Add(1) return OutgoingMessage{Action: Broadcast} })}, } @@ -176,7 +176,7 @@ func TestP2PSubmitWS(t *testing.T) { require.Eventually( t, func() bool { - return atomic.LoadUint32(&handlerCount) == 20 + return handlerCount.Load() == 20 }, 1*time.Second, 50*time.Millisecond, diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 2cce632bed..05e7ba44ca 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -184,7 +184,7 @@ type WebsocketNetwork struct { peersLock deadlock.RWMutex peers []*wsPeer - peersChangeCounter int32 // peersChangeCounter is an atomic variable that increases on each change to the peers. It helps avoiding taking the peersLock when checking if the peers list was modified. + peersChangeCounter atomic.Int32 // peersChangeCounter is an atomic variable that increases on each change to the peers. It helps avoiding taking the peersLock when checking if the peers list was modified. broadcaster msgBroadcaster handler msgHandler @@ -195,7 +195,7 @@ type WebsocketNetwork struct { NetworkID protocol.NetworkID RandomID string - ready int32 + ready atomic.Int32 readyChan chan struct{} meshUpdateRequests chan meshRequest @@ -244,7 +244,7 @@ type WebsocketNetwork struct { lastNetworkAdvance time.Time // number of throttled outgoing connections "slots" needed to be populated. - throttledOutgoingConnections int32 + throttledOutgoingConnections atomic.Int32 // transport and dialer are customized to limit the number of // connection in compliance with connectionsRateLimitingCount. @@ -262,7 +262,7 @@ type WebsocketNetwork struct { // further changes. messagesOfInterestEnc []byte messagesOfInterestEncoded bool - messagesOfInterestGeneration uint32 + messagesOfInterestGeneration atomic.Uint32 // messagesOfInterestMu protects messagesOfInterest and ensures // that messagesOfInterestEnc does not change once it is set during @@ -279,7 +279,7 @@ type WebsocketNetwork struct { nodeInfo NodeInfo // atomic {0:unknown, 1:yes, 2:no} - wantTXGossip uint32 + wantTXGossip atomic.Uint32 // supportedProtocolVersions defines versions supported by this network. // Should be used instead of a global network.SupportedProtocolVersions for network/peers configuration @@ -606,7 +606,7 @@ func (wn *WebsocketNetwork) setup() { wn.ctx, wn.ctxCancel = context.WithCancel(context.Background()) wn.relayMessages = wn.config.IsGossipServer() || wn.config.ForceRelayMessages if wn.relayMessages || wn.config.ForceFetchTransactions { - wn.wantTXGossip = wantTXGossipYes + wn.wantTXGossip.Store(wantTXGossipYes) } // roughly estimate the number of messages that could be seen at any given moment. // For the late/redo/down committee, which happen in parallel, we need to allocate @@ -667,7 +667,7 @@ func (wn *WebsocketNetwork) setup() { wn.protocolVersion = ProtocolVersion wn.messagesOfInterestRefresh = make(chan struct{}, 2) - wn.messagesOfInterestGeneration = 1 // something nonzero so that any new wsPeer needs updating + wn.messagesOfInterestGeneration.Store(1) // something nonzero so that any new wsPeer needs updating if wn.relayMessages { wn.registerMessageInterest(protocol.StateProofSigTag) } @@ -694,13 +694,13 @@ func (wn *WebsocketNetwork) Start() { // wrap the limited connection listener with a requests tracker listener wn.listener = wn.requestsTracker.Listener(listener) wn.log.Debugf("listening on %s", wn.listener.Addr().String()) - wn.throttledOutgoingConnections = int32(wn.config.GossipFanout / 2) + wn.throttledOutgoingConnections.Store(int32(wn.config.GossipFanout / 2)) } else { // on non-relay, all the outgoing connections are throttled. - wn.throttledOutgoingConnections = int32(wn.config.GossipFanout) + wn.throttledOutgoingConnections.Store(int32(wn.config.GossipFanout)) } if wn.config.DisableOutgoingConnectionThrottling { - wn.throttledOutgoingConnections = 0 + wn.throttledOutgoingConnections.Store(0) } if wn.config.TLSCertFile != "" && wn.config.TLSKeyFile != "" { wn.scheme = "https" @@ -1084,7 +1084,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt version: matchingVersion, identity: peerID, identityChallenge: peerIDChallenge, - identityVerified: 0, + identityVerified: atomic.Uint32{}, features: decodePeerFeatures(matchingVersion, request.Header.Get(PeerFeaturesHeader)), } peer.TelemetryGUID = trackedRequest.otherTelemetryGUID @@ -1106,8 +1106,8 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } func (wn *WebsocketNetwork) maybeSendMessagesOfInterest(peer *wsPeer, messagesOfInterestEnc []byte) { - messagesOfInterestGeneration := atomic.LoadUint32(&wn.messagesOfInterestGeneration) - peerMessagesOfInterestGeneration := atomic.LoadUint32(&peer.messagesOfInterestGeneration) + messagesOfInterestGeneration := wn.messagesOfInterestGeneration.Load() + peerMessagesOfInterestGeneration := peer.messagesOfInterestGeneration.Load() if peerMessagesOfInterestGeneration != messagesOfInterestGeneration { if messagesOfInterestEnc == nil { wn.messagesOfInterestMu.Lock() @@ -1361,7 +1361,7 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { } func (wn *WebsocketNetwork) getPeersChangeCounter() int32 { - return atomic.LoadInt32(&wn.peersChangeCounter) + return wn.peersChangeCounter.Load() } // preparePeerData prepares batches of data for sending. @@ -1779,12 +1779,12 @@ func (wn *WebsocketNetwork) getPeerConnectionTelemetryDetails(now time.Time, pee ConnectionDuration: uint(now.Sub(peer.createTime).Seconds()), TelemetryGUID: peer.TelemetryGUID, InstanceName: peer.InstanceName, - DuplicateFilterCount: atomic.LoadUint64(&peer.duplicateFilterCount), - TXCount: atomic.LoadUint64(&peer.txMessageCount), - MICount: atomic.LoadUint64(&peer.miMessageCount), - AVCount: atomic.LoadUint64(&peer.avMessageCount), - PPCount: atomic.LoadUint64(&peer.ppMessageCount), - UNKCount: atomic.LoadUint64(&peer.unkMessageCount), + DuplicateFilterCount: peer.duplicateFilterCount.Load(), + TXCount: peer.txMessageCount.Load(), + MICount: peer.miMessageCount.Load(), + AVCount: peer.avMessageCount.Load(), + PPCount: peer.ppMessageCount.Load(), + UNKCount: peer.unkMessageCount.Load(), } if tcpInfo, err := peer.GetUnderlyingConnTCPInfo(); err == nil && tcpInfo != nil { connDetail.TCP = *tcpInfo @@ -1822,7 +1822,7 @@ func (wn *WebsocketNetwork) prioWeightRefresh() { return } - if curPeersChangeCounter := atomic.LoadInt32(&wn.peersChangeCounter); curPeersChangeCounter != lastPeersChangeCounter { + if curPeersChangeCounter := wn.peersChangeCounter.Load(); curPeersChangeCounter != lastPeersChangeCounter { peers, lastPeersChangeCounter = wn.peerSnapshot(peers) } @@ -2141,10 +2141,10 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { } throttledConnection := false - if atomic.AddInt32(&wn.throttledOutgoingConnections, int32(-1)) >= 0 { + if wn.throttledOutgoingConnections.Add(int32(-1)) >= 0 { throttledConnection = true } else { - atomic.AddInt32(&wn.throttledOutgoingConnections, int32(1)) + wn.throttledOutgoingConnections.Add(int32(1)) } peer := &wsPeer{ @@ -2164,7 +2164,7 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { // if there is a final verification message to send, it means this peer has a verified identity, // attempt to set the peer and identityTracker if len(idVerificationMessage) > 0 { - atomic.StoreUint32(&peer.identityVerified, uint32(1)) + peer.identityVerified.Store(uint32(1)) wn.peersLock.Lock() ok := wn.identityTracker.setIdentity(peer) wn.peersLock.Unlock() @@ -2311,10 +2311,10 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) { telemetryspec.DisconnectPeerEventDetails{ PeerEventDetails: eventDetails, Reason: string(reason), - TXCount: atomic.LoadUint64(&peer.txMessageCount), - MICount: atomic.LoadUint64(&peer.miMessageCount), - AVCount: atomic.LoadUint64(&peer.avMessageCount), - PPCount: atomic.LoadUint64(&peer.ppMessageCount), + TXCount: peer.txMessageCount.Load(), + MICount: peer.miMessageCount.Load(), + AVCount: peer.avMessageCount.Load(), + PPCount: peer.ppMessageCount.Load(), }) peers.Set(uint64(wn.NumPeers())) @@ -2328,9 +2328,9 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) { wn.prioTracker.removePeer(peer) wn.identityTracker.removeIdentity(peer) if peer.throttledOutgoingConnection { - atomic.AddInt32(&wn.throttledOutgoingConnections, int32(1)) + wn.throttledOutgoingConnections.Add(int32(1)) } - atomic.AddInt32(&wn.peersChangeCounter, 1) + wn.peersChangeCounter.Add(1) } wn.countPeersSetGauges() } @@ -2339,7 +2339,7 @@ func (wn *WebsocketNetwork) addPeer(peer *wsPeer) { wn.peersLock.Lock() defer wn.peersLock.Unlock() // guard against peers which are closed or closing - if atomic.LoadInt32(&peer.didSignalClose) == 1 { + if peer.didSignalClose.Load() == 1 { networkPeerAlreadyClosed.Inc(nil) wn.log.Debugf("peer closing %s", peer.conn.RemoteAddrString()) return @@ -2354,15 +2354,15 @@ func (wn *WebsocketNetwork) addPeer(peer *wsPeer) { } heap.Push(peersHeap{wn}, peer) wn.prioTracker.setPriority(peer, peer.prioAddress, peer.prioWeight) - atomic.AddInt32(&wn.peersChangeCounter, 1) + wn.peersChangeCounter.Add(1) wn.countPeersSetGauges() if len(wn.peers) >= wn.config.GossipFanout { // we have a quorum of connected peers, if we weren't ready before, we are now - if atomic.CompareAndSwapInt32(&wn.ready, 0, 1) { + if wn.ready.CompareAndSwap(0, 1) { wn.log.Debug("ready") close(wn.readyChan) } - } else if atomic.LoadInt32(&wn.ready) == 0 { + } else if wn.ready.Load() == 0 { // but if we're not ready in a minute, call whatever peers we've got as good enough wn.wg.Add(1) go wn.eventualReady() @@ -2375,7 +2375,7 @@ func (wn *WebsocketNetwork) eventualReady() { select { case <-wn.ctx.Done(): case <-minute.C: - if atomic.CompareAndSwapInt32(&wn.ready, 0, 1) { + if wn.ready.CompareAndSwap(0, 1) { wn.log.Debug("ready") close(wn.readyChan) } @@ -2452,7 +2452,7 @@ func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() { // must run inside wn.messagesOfInterestMu.Lock wn.messagesOfInterestEnc = MarshallMessageOfInterestMap(wn.messagesOfInterest) wn.messagesOfInterestEncoded = true - atomic.AddUint32(&wn.messagesOfInterestGeneration, 1) + wn.messagesOfInterestGeneration.Add(1) var peers []*wsPeer peers, _ = wn.peerSnapshot(peers) wn.log.Infof("updateMessagesOfInterestEnc maybe sending messagesOfInterest %v", wn.messagesOfInterest) @@ -2466,14 +2466,14 @@ func (wn *WebsocketNetwork) postMessagesOfInterestThread() { <-wn.messagesOfInterestRefresh // if we're not a relay, and not participating, we don't need txn pool wantTXGossip := wn.nodeInfo.IsParticipating() - if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) { + if wantTXGossip && (wn.wantTXGossip.Load() != wantTXGossipYes) { wn.log.Infof("postMessagesOfInterestThread: enabling TX gossip") wn.registerMessageInterest(protocol.TxnTag) - atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipYes) - } else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) { + wn.wantTXGossip.Store(wantTXGossipYes) + } else if !wantTXGossip && (wn.wantTXGossip.Load() != wantTXGossipNo) { wn.log.Infof("postMessagesOfInterestThread: disabling TX gossip") wn.DeregisterMessageInterest(protocol.TxnTag) - atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipNo) + wn.wantTXGossip.Store(wantTXGossipNo) } } } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 099b04e895..445ede3dc3 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -173,7 +173,7 @@ type messageCounterHandler struct { // For deterministically simulating slow handlers, block until test code says to go. release sync.Cond - shouldWait int32 + shouldWait atomic.Int32 waitcount int } @@ -186,7 +186,7 @@ func (mch *messageCounterHandler) Handle(message IncomingMessage) OutgoingMessag dnanos := now - sent mch.t.Logf("msg trans time %dns", dnanos) } - if atomic.LoadInt32(&mch.shouldWait) > 0 { + if mch.shouldWait.Load() > 0 { mch.waitcount++ mch.release.Wait() mch.waitcount-- @@ -779,7 +779,8 @@ func TestSlowHandlers(t *testing.T) { slowTag := protocol.Tag("sl") fastTag := protocol.Tag("fa") - slowCounter := messageCounterHandler{shouldWait: 1} + slowCounter := messageCounterHandler{} + slowCounter.shouldWait.Store(1) slowCounter.release.L = &slowCounter.lock fastCounter := messageCounterHandler{target: incomingThreads} fastCounter.done = make(chan struct{}) @@ -856,7 +857,8 @@ func TestFloodingPeer(t *testing.T) { t.Skip("flaky test") slowTag := protocol.Tag("sl") fastTag := protocol.Tag("fa") - slowCounter := messageCounterHandler{shouldWait: 1} + slowCounter := messageCounterHandler{} + slowCounter.shouldWait.Store(1) slowCounter.release.L = &slowCounter.lock fastCounter := messageCounterHandler{} slowHandler := TaggedMessageHandler{Tag: slowTag, MessageHandler: &slowCounter} @@ -903,7 +905,7 @@ func TestFloodingPeer(t *testing.T) { defer cancel() defer func() { t.Log("release slow handlers") - atomic.StoreInt32(&slowCounter.shouldWait, 0) + slowCounter.shouldWait.Store(0) slowCounter.Broadcast() }() @@ -929,7 +931,7 @@ func TestFloodingPeer(t *testing.T) { } func peerIsClosed(peer *wsPeer) bool { - return atomic.LoadInt32(&peer.didInnerClose) != 0 + return peer.didInnerClose.Load() != 0 } func avgSendBufferHighPrioLength(wn *WebsocketNetwork) float64 { @@ -2560,7 +2562,7 @@ func TestSlowPeerDisconnection(t *testing.T) { } // modify the peer on netA and beforeLoopTime := time.Now() - atomic.StoreInt64(&peer.intermittentOutgoingMessageEnqueueTime, beforeLoopTime.Add(-maxMessageQueueDuration).Add(time.Second).UnixNano()) + peer.intermittentOutgoingMessageEnqueueTime.Store(beforeLoopTime.Add(-maxMessageQueueDuration).Add(time.Second).UnixNano()) // wait up to 10 seconds for the monitor to figure out it needs to disconnect. expire = beforeLoopTime.Add(2 * slowWritingPeerMonitorInterval) for { @@ -2875,7 +2877,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { msgCounters := make(map[protocol.Tag]int) expectedCounts := make(map[protocol.Tag]int) expectedCounts[ft2] = 5 - var failed uint32 + var failed atomic.Uint32 messageArriveWg := sync.WaitGroup{} msgHandler := func(msg IncomingMessage) (out OutgoingMessage) { t.Logf("A->B %s", msg.Tag) @@ -2883,7 +2885,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { defer incomingMsgSync.Unlock() expected := expectedCounts[msg.Tag] if expected < 1 { - atomic.StoreUint32(&failed, 1) + failed.Store(1) t.Logf("UNEXPECTED A->B %s", msg.Tag) return } @@ -2931,7 +2933,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { messageArriveWg.Add(5) // we're expecting exactly 5 messages. // send 5 messages of few types. for i := 0; i < 5; i++ { - if atomic.LoadUint32(&failed) != 0 { + if failed.Load() != 0 { t.Errorf("failed") break } @@ -2940,7 +2942,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { netA.Broadcast(context.Background(), ft2, []byte{0, 1, 2, 3, 4}, true, nil) netA.Broadcast(context.Background(), ft4, []byte{0, 1, 2, 3, 4}, true, nil) // NOT in MOI } - if atomic.LoadUint32(&failed) != 0 { + if failed.Load() != 0 { t.Errorf("failed") } // wait until all the expected messages arrive. @@ -2949,7 +2951,7 @@ func TestWebsocketNetworkMessageOfInterest(t *testing.T) { defer incomingMsgSync.Unlock() require.Equal(t, 1, len(msgCounters)) for tag, count := range msgCounters { - if atomic.LoadUint32(&failed) != 0 { + if failed.Load() != 0 { t.Errorf("failed") break } @@ -3155,7 +3157,7 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { netB.Start() defer netStop(t, netB, "B") require.False(t, netB.relayMessages) - require.Equal(t, uint32(wantTXGossipUnk), atomic.LoadUint32(&netB.wantTXGossip)) + require.Equal(t, uint32(wantTXGossipUnk), netB.wantTXGossip.Load()) incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) @@ -3197,12 +3199,12 @@ func TestWebsocketNetworkTXMessageOfInterestNPN(t *testing.T) { netB.OnNetworkAdvance() waitForMOIRefreshQuiet(netB) for i := 0; i < 100; i++ { - if atomic.LoadUint32(&netB.wantTXGossip) == uint32(wantTXGossipNo) { + if netB.wantTXGossip.Load() == uint32(wantTXGossipNo) { break } time.Sleep(10 * time.Millisecond) } - require.Equal(t, uint32(wantTXGossipNo), atomic.LoadUint32(&netB.wantTXGossip)) + require.Equal(t, uint32(wantTXGossipNo), netB.wantTXGossip.Load()) // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() @@ -3260,7 +3262,7 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { netB.Start() defer netStop(t, netB, "B") require.False(t, netB.relayMessages) - require.Equal(t, uint32(wantTXGossipUnk), atomic.LoadUint32(&netB.wantTXGossip)) + require.Equal(t, uint32(wantTXGossipUnk), netB.wantTXGossip.Load()) incomingMsgSync := deadlock.Mutex{} msgCounters := make(map[protocol.Tag]int) @@ -3302,12 +3304,12 @@ func TestWebsocketNetworkTXMessageOfInterestPN(t *testing.T) { netB.OnNetworkAdvance() waitForMOIRefreshQuiet(netB) for i := 0; i < 100; i++ { - if atomic.LoadUint32(&netB.wantTXGossip) == uint32(wantTXGossipYes) { + if netB.wantTXGossip.Load() == uint32(wantTXGossipYes) { break } time.Sleep(10 * time.Millisecond) } - require.Equal(t, uint32(wantTXGossipYes), atomic.LoadUint32(&netB.wantTXGossip)) + require.Equal(t, uint32(wantTXGossipYes), netB.wantTXGossip.Load()) // send another message which we can track, so that we'll know that the first message was delivered. netB.Broadcast(context.Background(), protocol.AgreementVoteTag, []byte{0, 1, 2, 3, 4}, true, nil) messageFilterArriveWg.Wait() @@ -3390,9 +3392,9 @@ func testWebsocketDisconnection(t *testing.T, disconnectFunc func(wn *WebsocketN return } - var msgCounterNetB uint32 + var msgCounterNetB atomic.Uint32 msgHandlerB := func(msg IncomingMessage) (out OutgoingMessage) { - if atomic.AddUint32(&msgCounterNetB, 1) == 5 { + if msgCounterNetB.Add(1) == 5 { // disconnect disconnectFunc(netB, &out) } else { @@ -3925,7 +3927,7 @@ func TestTryConnectEarlyWrite(t *testing.T) { p := netA.peers[0] var messageCount uint64 for x := 0; x < 1000; x++ { - messageCount = atomic.LoadUint64(&p.miMessageCount) + messageCount = p.miMessageCount.Load() if messageCount == 1 { break } @@ -3934,8 +3936,8 @@ func TestTryConnectEarlyWrite(t *testing.T) { // Confirm that we successfuly received a message of interest assert.Len(t, netA.peers, 1) - fmt.Printf("MI Message Count: %v\n", netA.peers[0].miMessageCount) - assert.Equal(t, uint64(1), netA.peers[0].miMessageCount) + fmt.Printf("MI Message Count: %v\n", netA.peers[0].miMessageCount.Load()) + assert.Equal(t, uint64(1), netA.peers[0].miMessageCount.Load()) } // Test functionality that allows a node to discard a block response that it did not request or that arrived too late. @@ -4030,7 +4032,7 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) { 500*time.Millisecond, 20*time.Millisecond, ) - require.Equal(t, atomic.LoadInt64(&netC.peers[0].outstandingTopicRequests), int64(1)) + require.Equal(t, netC.peers[0].outstandingTopicRequests.Load(), int64(1)) // Create a buffer to monitor log output from netC logBuffer := bytes.NewBuffer(nil) @@ -4040,7 +4042,7 @@ func TestDiscardUnrequestedBlockResponse(t *testing.T) { netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg} require.Eventually( t, - func() bool { return atomic.LoadInt64(&netC.peers[0].outstandingTopicRequests) == int64(0) }, + func() bool { return netC.peers[0].outstandingTopicRequests.Load() == int64(0) }, 500*time.Millisecond, 20*time.Millisecond, ) @@ -4422,7 +4424,7 @@ func TestSendMessageCallbacks(t *testing.T) { netA, netB, _, closeFunc := setupWebsocketNetworkAB(t, 2) defer closeFunc() - var counter uint64 + var counter atomic.Uint64 require.NotZero(t, netA.NumPeers()) // peerB is netA's representation of netB and vice versa @@ -4436,10 +4438,10 @@ func TestSendMessageCallbacks(t *testing.T) { // and goes through the actual response code path to generate and send TS responses to netB for i := 0; i < 100; i++ { randInt := crypto.RandUint64()%(128) + 1 - atomic.AddUint64(&counter, randInt) + counter.Add(randInt) topic := MakeTopic("val", []byte("blah")) callback := func() { - atomic.AddUint64(&counter, ^uint64(randInt-1)) + counter.Add(^uint64(randInt - 1)) } msg := IncomingMessage{Sender: peerB, Tag: protocol.UniEnsBlockReqTag} peerB.Respond(context.Background(), msg, OutgoingMessage{OnRelease: callback, Topics: Topics{topic}}) @@ -4448,14 +4450,14 @@ func TestSendMessageCallbacks(t *testing.T) { // of outstanding TS requests below 0. This will be true because we never made any UE block requests, we only // simulated them by manually creating a IncomingMessage with the UE tag in the loop above require.Eventually(t, - func() bool { return atomic.LoadInt64(&peerA.outstandingTopicRequests) < 0 }, + func() bool { return peerA.outstandingTopicRequests.Load() < 0 }, 500*time.Millisecond, 25*time.Millisecond, ) // confirm that the test counter decrements down to zero correctly through callbacks require.Eventually(t, - func() bool { return atomic.LoadUint64(&counter) == uint64(0) }, + func() bool { return counter.Load() == uint64(0) }, 500*time.Millisecond, 25*time.Millisecond, ) diff --git a/network/wsPeer.go b/network/wsPeer.go index 56f2b6a4f0..9daf7b0ece 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -202,27 +202,24 @@ type wsPeer struct { // lastPacketTime contains the UnixNano at the last time a successful communication was made with the peer. // "successful communication" above refers to either reading from or writing to a connection without receiving any // error. - // we want this to be a 64-bit aligned for atomics support on 32bit platforms. - lastPacketTime int64 + lastPacketTime atomic.Int64 // outstandingTopicRequests is an atomic counter for the number of outstanding block requests we've made out to this peer // if a peer sends more blocks than we've requested, we'll disconnect from it. - outstandingTopicRequests int64 + outstandingTopicRequests atomic.Int64 // intermittentOutgoingMessageEnqueueTime contains the UnixNano of the message's enqueue time that is currently being written to the // peer, or zero if no message is being written. - intermittentOutgoingMessageEnqueueTime int64 + intermittentOutgoingMessageEnqueueTime atomic.Int64 // Nonce used to uniquely identify requests - requestNonce uint64 + requestNonce atomic.Uint64 // duplicateFilterCount counts how many times the remote peer has sent us a message hash // to filter that it had already sent before. - // this needs to be 64-bit aligned for use with atomic.AddUint64 on 32-bit platforms. - duplicateFilterCount uint64 + duplicateFilterCount atomic.Uint64 - // These message counters need to be 64-bit aligned as well. - txMessageCount, miMessageCount, ppMessageCount, avMessageCount, unkMessageCount uint64 + txMessageCount, miMessageCount, ppMessageCount, avMessageCount, unkMessageCount atomic.Uint64 wsPeerCore @@ -239,8 +236,8 @@ type wsPeer struct { wg sync.WaitGroup - didSignalClose int32 - didInnerClose int32 + didSignalClose atomic.Int32 + didInnerClose atomic.Int32 TelemetryGUID string InstanceName string @@ -262,7 +259,7 @@ type wsPeer struct { // the peer's identity key which it uses for identityChallenge exchanges identity crypto.PublicKey - identityVerified uint32 + identityVerified atomic.Uint32 // the identityChallenge is recorded to the peer so it may verify its identity at a later time identityChallenge identityChallengeValue @@ -292,7 +289,7 @@ type wsPeer struct { sendMessageTag map[protocol.Tag]bool // messagesOfInterestGeneration is this node's messagesOfInterest version that we have seen to this peer. - messagesOfInterestGeneration uint32 + messagesOfInterestGeneration atomic.Uint32 // connMonitor used to measure the relative performance of the connection // compared to the other outgoing connections. Incoming connections would have this @@ -457,7 +454,7 @@ func (wp *wsPeer) init(config config.Local, sendBufferLength int) { wp.closing = make(chan struct{}) wp.sendBufferHighPrio = make(chan sendMessages, sendBufferLength) wp.sendBufferBulk = make(chan sendMessages, sendBufferLength) - atomic.StoreInt64(&wp.lastPacketTime, time.Now().UnixNano()) + wp.lastPacketTime.Store(time.Now().UnixNano()) wp.responseChannels = make(map[uint64]chan *Response) wp.sendMessageTag = defaultSendMessageTags wp.clientDataStore = make(map[string]interface{}) @@ -487,7 +484,7 @@ func (wp *wsPeer) OriginAddress() string { func (wp *wsPeer) reportReadErr(err error) { // only report error if we haven't already closed the peer - if atomic.LoadInt32(&wp.didInnerClose) == 0 { + if wp.didInnerClose.Load() == 0 { _, _, line, _ := runtime.Caller(1) wp.log.Warnf("peer[%s] line=%d read err: %s", wp.conn.RemoteAddrString(), line, err) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "reader err"}) @@ -541,10 +538,10 @@ func (wp *wsPeer) readLoop() { // Skip the message if it's a response to a request we didn't make or has timed out if msg.Tag == protocol.TopicMsgRespTag && wp.lenResponseChannels() == 0 { - atomic.AddInt64(&wp.outstandingTopicRequests, -1) + wp.outstandingTopicRequests.Add(-1) // This peers has sent us more responses than we have requested. This is a protocol violation and we should disconnect. - if atomic.LoadInt64(&wp.outstandingTopicRequests) < 0 { + if wp.outstandingTopicRequests.Load() < 0 { wp.log.Errorf("wsPeer readloop: peer %s sent TS response without a request", wp.conn.RemoteAddrString()) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "unrequestedTS"}) cleanupCloseError = disconnectUnexpectedTopicResp @@ -578,7 +575,7 @@ func (wp *wsPeer) readLoop() { return } msg.Net = wp.net - atomic.StoreInt64(&wp.lastPacketTime, msg.Received) + wp.lastPacketTime.Store(msg.Received) networkReceivedBytesTotal.AddUint64(uint64(len(msg.Data)+2), nil) networkMessageReceivedTotal.AddUint64(1, nil) networkReceivedBytesByTag.Add(string(tag[:]), uint64(len(msg.Data)+2)) @@ -594,7 +591,7 @@ func (wp *wsPeer) readLoop() { switch msg.Tag { case protocol.MsgOfInterestTag: // try to decode the message-of-interest - atomic.AddUint64(&wp.miMessageCount, 1) + wp.miMessageCount.Add(1) if close, reason := wp.handleMessageOfInterest(msg); close { cleanupCloseError = reason if reason == disconnectBadData { @@ -604,7 +601,7 @@ func (wp *wsPeer) readLoop() { } continue case protocol.TopicMsgRespTag: // Handle Topic message - atomic.AddInt64(&wp.outstandingTopicRequests, -1) + wp.outstandingTopicRequests.Add(-1) topics, err := UnmarshallTopics(msg.Data) if err != nil { wp.log.Warnf("wsPeer readLoop: could not read the message from: %s %s", wp.conn.RemoteAddrString(), err) @@ -634,17 +631,17 @@ func (wp *wsPeer) readLoop() { wp.handleFilterMessage(msg) continue case protocol.TxnTag: - atomic.AddUint64(&wp.txMessageCount, 1) + wp.txMessageCount.Add(1) case protocol.AgreementVoteTag: - atomic.AddUint64(&wp.avMessageCount, 1) + wp.avMessageCount.Add(1) case protocol.ProposalPayloadTag: - atomic.AddUint64(&wp.ppMessageCount, 1) + wp.ppMessageCount.Add(1) // the remaining valid tags: no special handling here case protocol.NetPrioResponseTag, protocol.PingTag, protocol.PingReplyTag, protocol.StateProofSigTag, protocol.UniEnsBlockReqTag, protocol.VoteBundleTag, protocol.NetIDVerificationTag: default: // unrecognized tag unknownProtocolTagMessagesTotal.Inc(nil) - atomic.AddUint64(&wp.unkMessageCount, 1) + wp.unkMessageCount.Add(1) continue // drop message, skip adding it to queue // TODO: should disconnect here? } @@ -740,7 +737,7 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) { // large message concurrently from several peers, and then sent the filter message to us after // each large message finished transferring. duplicateNetworkFilterReceivedTotal.Inc(nil) - atomic.AddUint64(&wp.duplicateFilterCount, 1) + wp.duplicateFilterCount.Add(1) } } @@ -792,17 +789,17 @@ func (wp *wsPeer) writeLoopSendMsg(msg sendMessage) disconnectReason { return disconnectStaleWrite } - atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, msg.enqueued.UnixNano()) - defer atomic.StoreInt64(&wp.intermittentOutgoingMessageEnqueueTime, 0) + wp.intermittentOutgoingMessageEnqueueTime.Store(msg.enqueued.UnixNano()) + defer wp.intermittentOutgoingMessageEnqueueTime.Store(0) err := wp.conn.WriteMessage(websocket.BinaryMessage, msg.data) if err != nil { - if atomic.LoadInt32(&wp.didInnerClose) == 0 { + if wp.didInnerClose.Load() == 0 { wp.log.Warn("peer write error ", err) networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "write err"}) } return disconnectWriteError } - atomic.StoreInt64(&wp.lastPacketTime, time.Now().UnixNano()) + wp.lastPacketTime.Store(time.Now().UnixNano()) networkSentBytesTotal.AddUint64(uint64(len(msg.data)), nil) networkSentBytesByTag.Add(string(tag), uint64(len(msg.data))) networkMessageSentTotal.AddUint64(1, nil) @@ -936,7 +933,7 @@ func (wp *wsPeer) pingTimes() (lastPingSent time.Time, lastPingRoundTripTime tim // called when the connection had an error or closed remotely func (wp *wsPeer) internalClose(reason disconnectReason) { - if atomic.CompareAndSwapInt32(&wp.didSignalClose, 0, 1) { + if wp.didSignalClose.CompareAndSwap(0, 1) { wp.net.peerRemoteClose(wp, reason) } wp.Close(time.Now().Add(peerDisconnectionAckDuration)) @@ -944,8 +941,8 @@ func (wp *wsPeer) internalClose(reason disconnectReason) { // called either here or from above enclosing node logic func (wp *wsPeer) Close(deadline time.Time) { - atomic.StoreInt32(&wp.didSignalClose, 1) - if atomic.CompareAndSwapInt32(&wp.didInnerClose, 0, 1) { + wp.didSignalClose.Store(1) + if wp.didInnerClose.CompareAndSwap(0, 1) { close(wp.closing) err := wp.conn.CloseWithMessage(websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), deadline) if err != nil { @@ -984,11 +981,11 @@ func (wp *wsPeer) CloseAndWait(deadline time.Time) { } func (wp *wsPeer) GetLastPacketTime() int64 { - return atomic.LoadInt64(&wp.lastPacketTime) + return wp.lastPacketTime.Load() } func (wp *wsPeer) CheckSlowWritingPeer(now time.Time) bool { - ongoingMessageTime := atomic.LoadInt64(&wp.intermittentOutgoingMessageEnqueueTime) + ongoingMessageTime := wp.intermittentOutgoingMessageEnqueueTime.Load() if ongoingMessageTime == 0 { return false } @@ -1000,7 +997,7 @@ func (wp *wsPeer) CheckSlowWritingPeer(now time.Time) bool { // The value is stored on wsPeer func (wp *wsPeer) getRequestNonce() []byte { buf := make([]byte, binary.MaxVarintLen64) - binary.PutUvarint(buf, atomic.AddUint64(&wp.requestNonce, 1)) + binary.PutUvarint(buf, wp.requestNonce.Add(1)) return buf } @@ -1016,7 +1013,7 @@ func MakeNonceTopic(nonce uint64) Topic { func (wp *wsPeer) Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error) { // Add nonce, stored on the wsPeer as the topic - nonceTopic := MakeNonceTopic(atomic.AddUint64(&wp.requestNonce, 1)) + nonceTopic := MakeNonceTopic(wp.requestNonce.Add(1)) topics = append(topics, nonceTopic) // serialize the topics @@ -1038,7 +1035,7 @@ func (wp *wsPeer) Request(ctx context.Context, tag Tag, topics Topics) (resp *Re ctx: context.Background()} select { case wp.sendBufferBulk <- sendMessages{msgs: msg}: - atomic.AddInt64(&wp.outstandingTopicRequests, 1) + wp.outstandingTopicRequests.Add(1) case <-wp.closing: e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddrString()) return @@ -1102,7 +1099,7 @@ func (wp *wsPeer) sendMessagesOfInterest(messagesOfInterestGeneration uint32, me if err != nil { wp.log.Errorf("ws send msgOfInterest: %v", err) } else { - atomic.StoreUint32(&wp.messagesOfInterestGeneration, messagesOfInterestGeneration) + wp.messagesOfInterestGeneration.Store(messagesOfInterestGeneration) } } diff --git a/network/wsPeer_test.go b/network/wsPeer_test.go index 4853b95e32..b6f3a4d2f0 100644 --- a/network/wsPeer_test.go +++ b/network/wsPeer_test.go @@ -25,9 +25,9 @@ import ( "path/filepath" "sort" "strings" + "sync/atomic" "testing" "time" - "unsafe" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" @@ -41,17 +41,17 @@ func TestCheckSlowWritingPeer(t *testing.T) { now := time.Now() peer := wsPeer{ - intermittentOutgoingMessageEnqueueTime: 0, + intermittentOutgoingMessageEnqueueTime: atomic.Int64{}, wsPeerCore: wsPeerCore{net: &WebsocketNetwork{ log: logging.TestingLog(t), }}, } require.Equal(t, peer.CheckSlowWritingPeer(now), false) - peer.intermittentOutgoingMessageEnqueueTime = now.UnixNano() + peer.intermittentOutgoingMessageEnqueueTime.Store(now.UnixNano()) require.Equal(t, peer.CheckSlowWritingPeer(now), false) - peer.intermittentOutgoingMessageEnqueueTime = now.Add(-maxMessageQueueDuration * 2).UnixNano() + peer.intermittentOutgoingMessageEnqueueTime.Store(now.Add(-maxMessageQueueDuration * 2).UnixNano()) require.Equal(t, peer.CheckSlowWritingPeer(now), true) } @@ -99,24 +99,6 @@ func TestDefaultMessageTagsLength(t *testing.T) { } } -// TestAtomicVariablesAlignment ensures that the 64-bit atomic variables -// offsets are 64-bit aligned. This is required due to go atomic library -// limitation. -func TestAtomicVariablesAlignment(t *testing.T) { - partitiontest.PartitionTest(t) - - p := wsPeer{} - require.True(t, (unsafe.Offsetof(p.requestNonce)%8) == 0) - require.True(t, (unsafe.Offsetof(p.lastPacketTime)%8) == 0) - require.True(t, (unsafe.Offsetof(p.intermittentOutgoingMessageEnqueueTime)%8) == 0) - require.True(t, (unsafe.Offsetof(p.duplicateFilterCount)%8) == 0) - require.True(t, (unsafe.Offsetof(p.txMessageCount)%8) == 0) - require.True(t, (unsafe.Offsetof(p.miMessageCount)%8) == 0) - require.True(t, (unsafe.Offsetof(p.ppMessageCount)%8) == 0) - require.True(t, (unsafe.Offsetof(p.avMessageCount)%8) == 0) - require.True(t, (unsafe.Offsetof(p.unkMessageCount)%8) == 0) -} - func TestTagCounterFiltering(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/rpcs/blockService.go b/rpcs/blockService.go index 0f48f873c0..a3bf886f2b 100644 --- a/rpcs/blockService.go +++ b/rpcs/blockService.go @@ -104,7 +104,7 @@ type BlockService struct { closeWaitGroup sync.WaitGroup mu deadlock.Mutex memoryUsed uint64 - wsMemoryUsed uint64 + wsMemoryUsed atomic.Uint64 memoryCap uint64 } @@ -320,9 +320,9 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc outMsg := network.OutgoingMessage{Topics: respTopics} if n > 0 { outMsg.OnRelease = func() { - atomic.AddUint64(&bs.wsMemoryUsed, ^uint64(n-1)) + bs.wsMemoryUsed.Add(^uint64(n - 1)) } - atomic.AddUint64(&bs.wsMemoryUsed, (n)) + bs.wsMemoryUsed.Add(n) } err := target.Respond(ctx, reqMsg, outMsg) if err != nil { @@ -332,7 +332,7 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc // If we are over-capacity, we will not process the request // respond to sender with error message - memUsed := atomic.LoadUint64(&bs.wsMemoryUsed) + memUsed := bs.wsMemoryUsed.Load() if memUsed > bs.memoryCap { err := errMemoryAtCapacity{capacity: bs.memoryCap, used: memUsed} bs.log.Infof("BlockService handleCatchupReq: %s", err.Error()) diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index a0ba919c98..832b59c55b 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -452,7 +452,7 @@ func TestWsBlockLimiting(t *testing.T) { roundBin), } reqMsg.Data = topics.MarshallTopics() - require.Zero(t, bs1.wsMemoryUsed) + require.Zero(t, bs1.wsMemoryUsed.Load()) bs1.handleCatchupReq(context.Background(), reqMsg) // We should have received the message into the mock peer and the block service should have memoryUsed > 0 data, found := peer.responseTopics.GetValue(BlockDataKey) @@ -460,7 +460,7 @@ func TestWsBlockLimiting(t *testing.T) { blk, _, err := ledger.EncodedBlockCert(basics.Round(2)) require.NoError(t, err) require.Equal(t, data, blk) - require.Positive(t, bs1.wsMemoryUsed) + require.Positive(t, bs1.wsMemoryUsed.Load()) // Before making a new request save the callback since the new failed message will overwrite it in the mock peer callback := peer.outMsg.OnRelease @@ -474,7 +474,7 @@ func TestWsBlockLimiting(t *testing.T) { // Now call the callback to free up memUsed require.Nil(t, peer.outMsg.OnRelease) callback() - require.Zero(t, bs1.wsMemoryUsed) + require.Zero(t, bs1.wsMemoryUsed.Load()) } // TestRedirectExceptions tests exception cases: diff --git a/rpcs/ledgerService.go b/rpcs/ledgerService.go index 8abf87e3ba..a3ff63e90e 100644 --- a/rpcs/ledgerService.go +++ b/rpcs/ledgerService.go @@ -63,7 +63,7 @@ type LedgerForService interface { // LedgerService represents the Ledger RPC API type LedgerService struct { // running is non-zero once the service is running, and zero when it's not running. it needs to be at a 32-bit aligned address for RasPI support. - running int32 + running atomic.Int32 ledger LedgerForService genesisID string net network.GossipNode @@ -89,14 +89,14 @@ func MakeLedgerService(config config.Local, ledger LedgerForService, net network // Start listening to catchup requests func (ls *LedgerService) Start() { if ls.enableService { - atomic.StoreInt32(&ls.running, 1) + ls.running.Store(1) } } // Stop servicing catchup requests func (ls *LedgerService) Stop() { if ls.enableService { - atomic.StoreInt32(&ls.running, 0) + ls.running.Store(0) ls.stopping.Wait() } } @@ -107,7 +107,7 @@ func (ls *LedgerService) Stop() { func (ls *LedgerService) ServeHTTP(response http.ResponseWriter, request *http.Request) { ls.stopping.Add(1) defer ls.stopping.Done() - if atomic.AddInt32(&ls.running, 0) == 0 { + if ls.running.Add(0) == 0 { response.WriteHeader(http.StatusNotFound) return } diff --git a/rpcs/ledgerService_test.go b/rpcs/ledgerService_test.go index 6b01cf0e16..1285795d4c 100644 --- a/rpcs/ledgerService_test.go +++ b/rpcs/ledgerService_test.go @@ -82,7 +82,7 @@ func TestLedgerService(t *testing.T) { ledgerService := MakeLedgerService(cfg, &l, &fnet, genesisID) fnet.AssertNotCalled(t, "RegisterHTTPHandler", LedgerServiceLedgerPath, ledgerService) ledgerService.Start() - require.Equal(t, int32(0), ledgerService.running) + require.Equal(t, int32(0), ledgerService.running.Load()) // Test GET 404 rr := httptest.NewRecorder() @@ -97,7 +97,7 @@ func TestLedgerService(t *testing.T) { ledgerService = MakeLedgerService(cfg, &l, &fnet, genesisID) fnet.AssertCalled(t, "RegisterHTTPHandler", LedgerServiceLedgerPath, ledgerService) ledgerService.Start() - require.Equal(t, int32(1), ledgerService.running) + require.Equal(t, int32(1), ledgerService.running.Load()) // Test GET 400 Bad Version String rr = httptest.NewRecorder() @@ -170,5 +170,5 @@ func TestLedgerService(t *testing.T) { // Test LedgerService Stopped ledgerService.Stop() - require.Equal(t, int32(0), ledgerService.running) + require.Equal(t, int32(0), ledgerService.running.Load()) } diff --git a/rpcs/txService_test.go b/rpcs/txService_test.go index dd999d6e65..8ef49e45a6 100644 --- a/rpcs/txService_test.go +++ b/rpcs/txService_test.go @@ -24,7 +24,6 @@ import ( "os" "strings" "sync" - "sync/atomic" "testing" "time" @@ -153,7 +152,7 @@ func TestTxSync(t *testing.T) { // Since syncer is not Started, set the context here syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) require.NoError(t, syncer.sync()) - require.Equal(t, int32(3), atomic.LoadInt32(&handler.messageCounter)) + require.Equal(t, int32(3), handler.messageCounter.Load()) } func BenchmarkTxSync(b *testing.B) { diff --git a/rpcs/txSyncer_test.go b/rpcs/txSyncer_test.go index b05e050ee2..43e85f4523 100644 --- a/rpcs/txSyncer_test.go +++ b/rpcs/txSyncer_test.go @@ -103,12 +103,12 @@ func (mock mockPendingTxAggregate) PendingTxGroups() [][]transactions.SignedTxn } type mockHandler struct { - messageCounter int32 + messageCounter atomic.Int32 err error } func (handler *mockHandler) Handle(txgroup []transactions.SignedTxn) error { - atomic.AddInt32(&handler.messageCounter, 1) + handler.messageCounter.Add(1) return handler.err } @@ -201,7 +201,7 @@ func TestSyncFromClient(t *testing.T) { syncer.log = logging.TestingLog(t) require.NoError(t, syncer.syncFromClient(&client)) - require.Equal(t, int32(1), atomic.LoadInt32(&handler.messageCounter)) + require.Equal(t, int32(1), handler.messageCounter.Load()) } func TestSyncFromUnsupportedClient(t *testing.T) { @@ -218,7 +218,7 @@ func TestSyncFromUnsupportedClient(t *testing.T) { syncer.log = logging.TestingLog(t) require.Error(t, syncer.syncFromClient(&client)) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) } func TestSyncFromClientAndQuit(t *testing.T) { @@ -235,7 +235,7 @@ func TestSyncFromClientAndQuit(t *testing.T) { syncer.log = logging.TestingLog(t) syncer.cancel() require.Error(t, syncer.syncFromClient(&client)) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) } func TestSyncFromClientAndError(t *testing.T) { @@ -251,7 +251,7 @@ func TestSyncFromClientAndError(t *testing.T) { syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.Error(t, syncer.syncFromClient(&client)) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) } func TestSyncFromClientAndTimeout(t *testing.T) { @@ -268,7 +268,7 @@ func TestSyncFromClientAndTimeout(t *testing.T) { syncer.ctx, syncer.cancel = context.WithCancel(context.Background()) syncer.log = logging.TestingLog(t) require.Error(t, syncer.syncFromClient(&client)) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) } func TestSync(t *testing.T) { @@ -292,7 +292,7 @@ func TestSync(t *testing.T) { syncer.log = logging.TestingLog(t) require.NoError(t, syncer.sync()) - require.Equal(t, int32(1), atomic.LoadInt32(&handler.messageCounter)) + require.Equal(t, int32(1), handler.messageCounter.Load()) } func TestNoClientsSync(t *testing.T) { @@ -307,7 +307,7 @@ func TestNoClientsSync(t *testing.T) { syncer.log = logging.TestingLog(t) require.NoError(t, syncer.sync()) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) } func TestStartAndStop(t *testing.T) { @@ -335,22 +335,22 @@ func TestStartAndStop(t *testing.T) { canStart := make(chan struct{}) syncer.Start(canStart) time.Sleep(2 * time.Second) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) // signal that syncing can start close(canStart) for x := 0; x < 20; x++ { time.Sleep(100 * time.Millisecond) - if atomic.LoadInt32(&handler.messageCounter) != 0 { + if handler.messageCounter.Load() != 0 { break } } - require.Equal(t, int32(1), atomic.LoadInt32(&handler.messageCounter)) + require.Equal(t, int32(1), handler.messageCounter.Load()) // stop syncing and ensure it doesn't happen syncer.Stop() time.Sleep(2 * time.Second) - require.Equal(t, int32(1), atomic.LoadInt32(&handler.messageCounter)) + require.Equal(t, int32(1), handler.messageCounter.Load()) } func TestStartAndQuit(t *testing.T) { @@ -370,12 +370,12 @@ func TestStartAndQuit(t *testing.T) { canStart := make(chan struct{}) syncer.Start(canStart) time.Sleep(2 * time.Second) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) syncer.cancel() time.Sleep(50 * time.Millisecond) // signal that syncing can start, but ensure that it doesn't start (since we quit) close(canStart) time.Sleep(2 * time.Second) - require.Zero(t, atomic.LoadInt32(&handler.messageCounter)) + require.Zero(t, handler.messageCounter.Load()) } diff --git a/shared/pingpong/accounts.go b/shared/pingpong/accounts.go index 0471d84be4..3c34469f3b 100644 --- a/shared/pingpong/accounts.go +++ b/shared/pingpong/accounts.go @@ -209,10 +209,10 @@ func (pps *WorkerState) ensureAccounts(ac *libgoal.Client) (err error) { } ppa := &pingPongAccount{ - balance: amt, - sk: secret, - pk: accountAddress, + sk: secret, + pk: accountAddress, } + ppa.balance.Store(amt) pps.integrateAccountInfo(addr, ppa, ai) @@ -246,7 +246,7 @@ func (pps *WorkerState) ensureAccounts(ac *libgoal.Client) (err error) { } func (pps *WorkerState) integrateAccountInfo(addr string, ppa *pingPongAccount, ai model.Account) { - ppa.balance = ai.Amount + ppa.balance.Store(ai.Amount) // assets this account has created if ai.CreatedAssets != nil { for _, ap := range *ai.CreatedAssets { diff --git a/shared/pingpong/pingpong.go b/shared/pingpong/pingpong.go index a35fd451a4..95aeac0391 100644 --- a/shared/pingpong/pingpong.go +++ b/shared/pingpong/pingpong.go @@ -57,7 +57,7 @@ type CreatablesInfo struct { // pingPongAccount represents the account state for each account in the pingpong application // This includes the current balance and public/private keys tied to the account type pingPongAccount struct { - balance uint64 + balance atomic.Uint64 balanceRound uint64 deadlock.Mutex @@ -69,22 +69,22 @@ type pingPongAccount struct { } func (ppa *pingPongAccount) getBalance() uint64 { - return atomic.LoadUint64(&ppa.balance) + return ppa.balance.Load() } func (ppa *pingPongAccount) setBalance(balance uint64) { - atomic.StoreUint64(&ppa.balance, balance) + ppa.balance.Store(balance) } func (ppa *pingPongAccount) addBalance(offset int64) { if offset >= 0 { - atomic.AddUint64(&ppa.balance, uint64(offset)) + ppa.balance.Add(uint64(offset)) return } for { - v := atomic.LoadUint64(&ppa.balance) + v := ppa.balance.Load() nv := v - uint64(-offset) - done := atomic.CompareAndSwapUint64(&ppa.balance, v, nv) + done := ppa.balance.CompareAndSwap(v, nv) if done { return } @@ -118,7 +118,7 @@ func (ppa *pingPongAccount) String() string { ppa.Lock() defer ppa.Unlock() var ow strings.Builder - fmt.Fprintf(&ow, "%s %d", ppa.pk.String(), ppa.balance) + fmt.Fprintf(&ow, "%s %d", ppa.pk.String(), ppa.balance.Load()) if len(ppa.holdings) > 0 { fmt.Fprintf(&ow, "[") first := true @@ -1036,11 +1036,11 @@ type paymentUpdate struct { } func (au *paymentUpdate) apply(pps *WorkerState) { - pps.accounts[au.from].balance -= (au.fee + au.amt) + pps.accounts[au.from].balance.Add(-(au.fee + au.amt)) // update account balance to := pps.accounts[au.to] if to != nil { - to.balance += au.amt + to.balance.Add(au.amt) } } @@ -1164,7 +1164,7 @@ type assetUpdate struct { } func (au *assetUpdate) apply(pps *WorkerState) { - pps.accounts[au.from].balance -= au.fee + pps.accounts[au.from].balance.Add(-au.fee) pps.accounts[au.from].holdings[au.aidx] -= au.amt to := pps.accounts[au.to] if to.holdings == nil { @@ -1240,7 +1240,7 @@ type appUpdate struct { } func (au *appUpdate) apply(pps *WorkerState) { - pps.accounts[au.from].balance -= au.fee + pps.accounts[au.from].balance.Add(-au.fee) } func (pps *WorkerState) constructNFTGenTxn(from, to string, fee uint64, client *libgoal.Client, noteField []byte, lease [32]byte) (txn transactions.Transaction, sender string, update txnUpdate, err error) { @@ -1323,7 +1323,7 @@ type nftgenUpdate struct { } func (au *nftgenUpdate) apply(pps *WorkerState) { - pps.accounts[au.from].balance -= au.fee + pps.accounts[au.from].balance.Add(-au.fee) } func signTxn(signer *pingPongAccount, txn transactions.Transaction, cfg PpConfig) (stxn transactions.SignedTxn, err error) { diff --git a/util/condvar/timedwait.go b/util/condvar/timedwait.go index e14f2b33b7..7b275bfb3a 100644 --- a/util/condvar/timedwait.go +++ b/util/condvar/timedwait.go @@ -32,12 +32,12 @@ import ( // This function does not indicate whether a timeout occurred or not; // the caller should check time.Now() as needed. func TimedWait(c *sync.Cond, timeout time.Duration) { - var done int32 + var done atomic.Bool go func() { util.NanoSleep(timeout) - for atomic.LoadInt32(&done) == 0 { + for !done.Load() { c.Broadcast() // It is unlikely but possible that the parent @@ -49,5 +49,5 @@ func TimedWait(c *sync.Cond, timeout time.Duration) { }() c.Wait() - atomic.StoreInt32(&done, 1) + done.Store(true) } diff --git a/util/metrics/counter.go b/util/metrics/counter.go index 2efb52be52..db0c6e6863 100644 --- a/util/metrics/counter.go +++ b/util/metrics/counter.go @@ -20,7 +20,6 @@ import ( "math" "strconv" "strings" - "sync/atomic" "time" ) @@ -111,7 +110,7 @@ func (counter *Counter) AddMicrosecondsSince(t time.Time, labels map[string]stri // GetUint64Value returns the value of the counter. func (counter *Counter) GetUint64Value() (x uint64) { - return atomic.LoadUint64(&counter.intValue) + return counter.intValue.Load() } // GetUint64ValueForLabels returns the value of the counter for the given labels or 0 if it's not found. @@ -128,7 +127,7 @@ func (counter *Counter) GetUint64ValueForLabels(labels map[string]string) uint64 } func (counter *Counter) fastAddUint64(x uint64) { - if atomic.AddUint64(&counter.intValue, x) == x { + if counter.intValue.Add(x) == x { // What we just added is the whole value, this // is the first Add. Create a dummy // counterValue for the no-labels value. @@ -202,7 +201,7 @@ func (counter *Counter) WriteMetric(buf *strings.Builder, parentLabels string) { buf.WriteString("} ") value := l.counter if len(l.labels) == 0 { - value += atomic.LoadUint64(&counter.intValue) + value += counter.intValue.Load() } buf.WriteString(strconv.FormatUint(value, 10)) buf.WriteString("\n") @@ -221,7 +220,7 @@ func (counter *Counter) AddMetric(values map[string]float64) { for _, l := range counter.values { sum := l.counter if len(l.labels) == 0 { - sum += atomic.LoadUint64(&counter.intValue) + sum += counter.intValue.Load() } var suffix string if len(l.formattedLabels) > 0 { diff --git a/util/metrics/counterCommon.go b/util/metrics/counterCommon.go index 2a810ace69..dc187b3b48 100644 --- a/util/metrics/counterCommon.go +++ b/util/metrics/counterCommon.go @@ -17,14 +17,15 @@ package metrics import ( + "sync/atomic" + "github.com/algorand/go-deadlock" ) // Counter represent a single counter variable. type Counter struct { // Collects value for special fast-path with no labels through Inc(nil) AddUint64(x, nil) - // We want to make it on a 64-bit aligned address for ARM compiliers as it's being used by AddUint64 - intValue uint64 + intValue atomic.Uint64 deadlock.Mutex name string diff --git a/util/metrics/gauge.go b/util/metrics/gauge.go index ce203d47c0..593be0e9d5 100644 --- a/util/metrics/gauge.go +++ b/util/metrics/gauge.go @@ -24,7 +24,7 @@ import ( // Gauge represent a single gauge variable. type Gauge struct { - value uint64 + value atomic.Uint64 name string description string } @@ -59,12 +59,12 @@ func (gauge *Gauge) Deregister(reg *Registry) { // Add increases gauge by x func (gauge *Gauge) Add(x uint64) { - atomic.AddUint64(&gauge.value, x) + gauge.value.Add(x) } // Set sets gauge to x func (gauge *Gauge) Set(x uint64) { - atomic.StoreUint64(&gauge.value, x) + gauge.value.Store(x) } // WriteMetric writes the metric into the output stream @@ -82,14 +82,14 @@ func (gauge *Gauge) WriteMetric(buf *strings.Builder, parentLabels string) { buf.WriteString(parentLabels) } buf.WriteString("} ") - value := atomic.LoadUint64(&gauge.value) + value := gauge.value.Load() buf.WriteString(strconv.FormatUint(value, 10)) buf.WriteString("\n") } // AddMetric adds the metric into the map func (gauge *Gauge) AddMetric(values map[string]float64) { - value := atomic.LoadUint64(&gauge.value) + value := gauge.value.Load() values[sanitizeTelemetryName(gauge.name)] = float64(value) }