Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: use atomic type #27030

Merged
merged 3 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package downloader
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -371,7 +370,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
continue
}
// If the pivot block is committed, signal header sync termination
if atomic.LoadInt32(&d.committed) == 1 {
if d.committed.Load() {
select {
case d.headerProcCh <- nil:
return nil
Expand Down
30 changes: 15 additions & 15 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type headerTask struct {
}

type Downloader struct {
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events

checkpoint uint64 // Checkpoint block number to enforce head against (e.g. snap sync)
Expand All @@ -122,9 +122,9 @@ type Downloader struct {

// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
synchronising int32
notified int32
committed int32
synchronising atomic.Bool
notified atomic.Bool
committed atomic.Bool
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.

// Channels
Expand Down Expand Up @@ -292,7 +292,7 @@ func (d *Downloader) Progress() ethereum.SyncProgress {

// Synchronising returns whether the downloader is currently retrieving blocks.
func (d *Downloader) Synchronising() bool {
return atomic.LoadInt32(&d.synchronising) > 0
return d.synchronising.Load()
}

// RegisterPeer injects a new download peer into the set of block source to be
Expand Down Expand Up @@ -392,13 +392,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
return d.synchroniseMock(id, hash)
}
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
if !d.synchronising.CompareAndSwap(false, true) {
return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
defer d.synchronising.Store(false)

// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
if d.notified.CompareAndSwap(false, true) {
log.Info("Block synchronisation started")
}
if mode == SnapSync {
Expand Down Expand Up @@ -435,7 +435,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
defer d.Cancel() // No matter what, we can't leave the cancel channel open

// Atomically set the requested sync mode
atomic.StoreUint32(&d.mode, uint32(mode))
d.mode.Store(uint32(mode))

// Retrieve the origin peer and initiate the downloading process
var p *peerConnection
Expand All @@ -452,7 +452,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int,
}

func (d *Downloader) getMode() SyncMode {
return SyncMode(atomic.LoadUint32(&d.mode))
return SyncMode(d.mode.Load())
}

// syncWithPeer starts a block synchronization based on the hash chain from the
Expand Down Expand Up @@ -562,9 +562,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
}
}
d.committed = 1
d.committed.Store(true)
if mode == SnapSync && pivot.Number.Uint64() != 0 {
d.committed = 0
d.committed.Store(false)
}
if mode == SnapSync {
// Set the ancient data limitation. If we are running snap sync, all block
Expand Down Expand Up @@ -1128,7 +1128,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// If no more headers are inbound, notify the content fetchers and return
if len(headers) == 0 {
// Don't abort header fetches while the pivot is downloading
if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
if !d.committed.Load() && pivot <= from {
p.log.Debug("No headers, waiting for pivot commit")
select {
case <-time.After(fsHeaderContCheck):
Expand Down Expand Up @@ -1669,7 +1669,7 @@ func (d *Downloader) processSnapSyncContent() error {
results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
}
// Split around the pivot block and process the two sides via snap/full sync
if atomic.LoadInt32(&d.committed) == 0 {
if !d.committed.Load() {
latest := results[len(results)-1].Header
// If the height is above the pivot block by 2 sets, it means the pivot
// become stale in the network and it was garbage collected, move to a
Expand Down Expand Up @@ -1794,7 +1794,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {
return err
}
atomic.StoreInt32(&d.committed, 1)
d.committed.Store(true)
return nil
}

Expand Down
24 changes: 12 additions & 12 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,9 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester.newPeer("peer", protocol, testChainBase.blocks[1:])

// Wrap the importer to allow stepping
blocked, proceed := uint32(0), make(chan struct{})
blocked, proceed := atomic.Uint32{}, make(chan struct{})
s7v7nislands marked this conversation as resolved.
Show resolved Hide resolved
tester.downloader.chainInsertHook = func(results []*fetchResult) {
atomic.StoreUint32(&blocked, uint32(len(results)))
blocked.Store(uint32(len(results)))
<-proceed
}
// Start a synchronisation concurrently
Expand All @@ -505,7 +505,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
tester.downloader.queue.resultCache.lock.Lock()
{
cached = tester.downloader.queue.resultCache.countCompleted()
frozen = int(atomic.LoadUint32(&blocked))
frozen = int(blocked.Load())
retrieved = int(tester.chain.CurrentSnapBlock().Number.Uint64()) + 1
}
tester.downloader.queue.resultCache.lock.Unlock()
Expand All @@ -528,8 +528,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1)
}
// Permit the blocked blocks to import
if atomic.LoadUint32(&blocked) > 0 {
atomic.StoreUint32(&blocked, uint32(0))
if blocked.Load() > 0 {
blocked.Store(uint32(0))
proceed <- struct{}{}
}
}
Expand Down Expand Up @@ -786,12 +786,12 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
tester.newPeer("peer", protocol, chain.blocks[1:])

// Instrument the downloader to signal body requests
bodiesHave, receiptsHave := int32(0), int32(0)
bodiesHave, receiptsHave := atomic.Int32{}, atomic.Int32{}
s7v7nislands marked this conversation as resolved.
Show resolved Hide resolved
tester.downloader.bodyFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&bodiesHave, int32(len(headers)))
bodiesHave.Add(int32(len(headers)))
}
tester.downloader.receiptFetchHook = func(headers []*types.Header) {
atomic.AddInt32(&receiptsHave, int32(len(headers)))
receiptsHave.Add(int32(len(headers)))
}
// Synchronise with the peer and make sure all blocks were retrieved
if err := tester.sync("peer", nil, mode); err != nil {
Expand All @@ -811,11 +811,11 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
receiptsNeeded++
}
}
if int(bodiesHave) != bodiesNeeded {
t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded)
if int(bodiesHave.Load()) != bodiesNeeded {
t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave.Load(), bodiesNeeded)
}
if int(receiptsHave) != receiptsNeeded {
t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded)
if int(receiptsHave.Load()) != receiptsNeeded {
t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave.Load(), receiptsNeeded)
}
}

Expand Down
18 changes: 9 additions & 9 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type fetchRequest struct {
// fetchResult is a struct collecting partial results from data fetchers until
// all outstanding pieces complete and the result as a whole can be processed.
type fetchResult struct {
pending int32 // Flag telling what deliveries are outstanding
pending atomic.Int32 // Flag telling what deliveries are outstanding

Header *types.Header
Uncles []*types.Header
Expand All @@ -75,38 +75,38 @@ func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
Header: header,
}
if !header.EmptyBody() {
item.pending |= (1 << bodyType)
item.pending.Store(item.pending.Load() | (1 << bodyType))
} else if header.WithdrawalsHash != nil {
item.Withdrawals = make(types.Withdrawals, 0)
}
if fastSync && !header.EmptyReceipts() {
item.pending |= (1 << receiptType)
item.pending.Store(item.pending.Load() | (1 << receiptType))
}
return item
}

// SetBodyDone flags the body as finished.
func (f *fetchResult) SetBodyDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 {
atomic.AddInt32(&f.pending, -1)
if v := f.pending.Load(); (v & (1 << bodyType)) != 0 {
f.pending.Add(-1)
}
}

// AllDone checks if item is done.
func (f *fetchResult) AllDone() bool {
return atomic.LoadInt32(&f.pending) == 0
return f.pending.Load() == 0
}

// SetReceiptsDone flags the receipts as finished.
func (f *fetchResult) SetReceiptsDone() {
if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 {
atomic.AddInt32(&f.pending, -2)
if v := f.pending.Load(); (v & (1 << receiptType)) != 0 {
f.pending.Add(-2)
}
}

// Done checks if the given type is done already
func (f *fetchResult) Done(kind uint) bool {
v := atomic.LoadInt32(&f.pending)
v := f.pending.Load()
return v&(1<<kind) == 0
}

Expand Down
8 changes: 4 additions & 4 deletions eth/downloader/resultstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type resultStore struct {
// Internal index of first non-completed entry, updated atomically when needed.
// If all items are complete, this will equal length(items), so
// *important* : is not safe to use for indexing without checking against length
indexIncomplete int32 // atomic access
indexIncomplete atomic.Int32

// throttleThreshold is the limit up to which we _want_ to fill the
// results. If blocks are large, we want to limit the results to less
Expand Down Expand Up @@ -146,7 +146,7 @@ func (r *resultStore) HasCompletedItems() bool {
func (r *resultStore) countCompleted() int {
// We iterate from the already known complete point, and see
// if any more has completed since last count
index := atomic.LoadInt32(&r.indexIncomplete)
index := r.indexIncomplete.Load()
for ; ; index++ {
if index >= int32(len(r.items)) {
break
Expand All @@ -156,7 +156,7 @@ func (r *resultStore) countCompleted() int {
break
}
}
atomic.StoreInt32(&r.indexIncomplete, index)
r.indexIncomplete.Store(index)
return int(index)
}

Expand All @@ -179,7 +179,7 @@ func (r *resultStore) GetCompleted(limit int) []*fetchResult {
}
// Advance the expected block number of the first cache entry
r.resultOffset += uint64(limit)
atomic.AddInt32(&r.indexIncomplete, int32(-limit))
r.indexIncomplete.Add(int32(-limit))

return results
}
Expand Down
24 changes: 12 additions & 12 deletions eth/downloader/skeleton_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ type skeletonTestPeer struct {

serve func(origin uint64) []*types.Header // Hook to allow custom responses

served uint64 // Number of headers served by this peer
dropped uint64 // Flag whether the peer was dropped (stop responding)
served atomic.Uint64 // Number of headers served by this peer
dropped atomic.Uint64 // Flag whether the peer was dropped (stop responding)
}

// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with.
Expand Down Expand Up @@ -113,7 +113,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
// Since skeleton test peer are in-memory mocks, dropping the does not make
// them inaccessible. As such, check a local `dropped` field to see if the
// peer has been dropped and should not respond any more.
if atomic.LoadUint64(&p.dropped) != 0 {
if p.dropped.Load() != 0 {
return nil, errors.New("peer already dropped")
}
// Skeleton sync retrieves batches of headers going backward without gaps.
Expand Down Expand Up @@ -161,7 +161,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
}
}
}
atomic.AddUint64(&p.served, uint64(len(headers)))
p.served.Add(uint64(len(headers)))

hashes := make([]common.Hash, len(headers))
for i, header := range headers {
Expand All @@ -182,7 +182,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
sink <- res
if err := <-res.Done; err != nil {
log.Warn("Skeleton test peer response rejected", "err", err)
atomic.AddUint64(&p.dropped, 1)
p.dropped.Add(1)
}
}()
return req, nil
Expand Down Expand Up @@ -817,7 +817,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
dropped := make(map[string]int)
drop := func(peer string) {
if p := peerset.Peer(peer); p != nil {
atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1)
p.peer.(*skeletonTestPeer).dropped.Add(1)
}
peerset.Unregister(peer)
dropped[peer]++
Expand Down Expand Up @@ -895,14 +895,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if !tt.unpredictable {
var served uint64
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
served += peer.served.Load()
}
if served != tt.midserve {
t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve)
}
var drops uint64
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
drops += peer.dropped.Load()
}
if drops != tt.middrop {
t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
Expand Down Expand Up @@ -950,20 +950,20 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
if !tt.unpredictable {
served := uint64(0)
for _, peer := range tt.peers {
served += atomic.LoadUint64(&peer.served)
served += peer.served.Load()
}
if tt.newPeer != nil {
served += atomic.LoadUint64(&tt.newPeer.served)
served += tt.newPeer.served.Load()
}
if served != tt.endserve {
t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve)
}
drops := uint64(0)
for _, peer := range tt.peers {
drops += atomic.LoadUint64(&peer.dropped)
drops += peer.dropped.Load()
}
if tt.newPeer != nil {
drops += atomic.LoadUint64(&tt.newPeer.dropped)
drops += tt.newPeer.dropped.Load()
}
if drops != tt.enddrop {
t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop)
Expand Down