Skip to content

Commit d0a7f3d

Browse files
james-prysmfernantho
authored andcommitted
wrapping goodbye messages in goroutine to speed up node shutdown (OffchainLabs#15542)
* wrapping goodbye messages in goroutine to speed up node shutdown * fixing requirement
1 parent e1fc397 commit d0a7f3d

File tree

3 files changed

+235
-8
lines changed

3 files changed

+235
-8
lines changed

beacon-chain/sync/service.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -287,25 +287,33 @@ func (s *Service) Stop() error {
287287
}
288288
}()
289289

290-
// Say goodbye to all peers.
290+
// Create context with timeout to prevent hanging
291+
goodbyeCtx, cancel := context.WithTimeout(s.ctx, 10*time.Second)
292+
defer cancel()
293+
294+
// Use WaitGroup to ensure all goodbye messages complete
295+
var wg sync.WaitGroup
291296
for _, peerID := range s.cfg.p2p.Peers().Connected() {
292297
if s.cfg.p2p.Host().Network().Connectedness(peerID) == network.Connected {
293-
if err := s.sendGoodByeAndDisconnect(s.ctx, p2ptypes.GoodbyeCodeClientShutdown, peerID); err != nil {
294-
log.WithError(err).WithField("peerID", peerID).Error("Failed to send goodbye message")
295-
}
298+
wg.Add(1)
299+
go func(pid peer.ID) {
300+
defer wg.Done()
301+
if err := s.sendGoodByeAndDisconnect(goodbyeCtx, p2ptypes.GoodbyeCodeClientShutdown, pid); err != nil {
302+
log.WithError(err).WithField("peerID", pid).Error("Failed to send goodbye message")
303+
}
304+
}(peerID)
296305
}
297306
}
307+
wg.Wait()
308+
log.Debug("All goodbye messages sent successfully")
298309

299-
// Removing RPC Stream handlers.
310+
// Now safe to remove handlers / unsubscribe.
300311
for _, p := range s.cfg.p2p.Host().Mux().Protocols() {
301312
s.cfg.p2p.Host().RemoveStreamHandler(p)
302313
}
303-
304-
// Deregister Topic Subscribers.
305314
for _, t := range s.cfg.p2p.PubSub().GetTopics() {
306315
s.unSubscribeFromTopic(t)
307316
}
308-
309317
return nil
310318
}
311319

beacon-chain/sync/service_test.go

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,30 @@ package sync
22

33
import (
44
"context"
5+
"sync"
56
"testing"
67
"time"
78

89
"github.com/OffchainLabs/prysm/v6/async/abool"
910
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
1011
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
1112
dbTest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
13+
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
1214
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
15+
p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types"
1316
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
1417
state_native "github.com/OffchainLabs/prysm/v6/beacon-chain/state/state-native"
1518
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
19+
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
20+
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
1621
"github.com/OffchainLabs/prysm/v6/crypto/bls"
1722
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
1823
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
1924
"github.com/OffchainLabs/prysm/v6/testing/assert"
2025
"github.com/OffchainLabs/prysm/v6/testing/require"
2126
"github.com/OffchainLabs/prysm/v6/testing/util"
27+
"github.com/libp2p/go-libp2p/core/network"
28+
"github.com/libp2p/go-libp2p/core/protocol"
2229
gcache "github.com/patrickmn/go-cache"
2330
)
2431

@@ -227,3 +234,212 @@ func TestSyncService_StopCleanly(t *testing.T) {
227234
require.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
228235
require.Equal(t, 0, len(r.cfg.p2p.Host().Mux().Protocols()))
229236
}
237+
238+
func TestService_Stop_SendsGoodbyeMessages(t *testing.T) {
239+
// Create test peers
240+
p1 := p2ptest.NewTestP2P(t)
241+
p2 := p2ptest.NewTestP2P(t)
242+
p3 := p2ptest.NewTestP2P(t)
243+
244+
// Connect peers
245+
p1.Connect(p2)
246+
p1.Connect(p3)
247+
248+
// Register peers in the peer status
249+
p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound)
250+
p1.Peers().Add(nil, p3.BHost.ID(), p3.BHost.Addrs()[0], network.DirOutbound)
251+
p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected)
252+
p1.Peers().SetConnectionState(p3.BHost.ID(), peers.Connected)
253+
254+
// Create service with connected peers
255+
d := dbTest.SetupDB(t)
256+
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
257+
ctx, cancel := context.WithCancel(context.Background())
258+
259+
r := &Service{
260+
cfg: &config{
261+
beaconDB: d,
262+
p2p: p1,
263+
chain: chain,
264+
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
265+
},
266+
ctx: ctx,
267+
cancel: cancel,
268+
rateLimiter: newRateLimiter(p1),
269+
}
270+
271+
// Initialize context map for RPC
272+
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
273+
require.NoError(t, err)
274+
r.ctxMap = ctxMap
275+
276+
// Setup rate limiter for goodbye topic
277+
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
278+
topic := string(pcl)
279+
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
280+
281+
// Track goodbye messages received
282+
var goodbyeMessages sync.Map
283+
var wg sync.WaitGroup
284+
285+
wg.Add(2)
286+
287+
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
288+
defer wg.Done()
289+
out := new(primitives.SSZUint64)
290+
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
291+
goodbyeMessages.Store(p2.BHost.ID().String(), *out)
292+
require.NoError(t, stream.Close())
293+
})
294+
295+
p3.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
296+
defer wg.Done()
297+
out := new(primitives.SSZUint64)
298+
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
299+
goodbyeMessages.Store(p3.BHost.ID().String(), *out)
300+
require.NoError(t, stream.Close())
301+
})
302+
303+
connectedPeers := r.cfg.p2p.Peers().Connected()
304+
t.Logf("Connected peers before Stop: %d", len(connectedPeers))
305+
assert.Equal(t, 2, len(connectedPeers), "Expected 2 connected peers")
306+
307+
err = r.Stop()
308+
assert.NoError(t, err)
309+
310+
// Wait for goodbye messages
311+
if util.WaitTimeout(&wg, 15*time.Second) {
312+
t.Fatal("Did not receive goodbye messages within timeout")
313+
}
314+
315+
// Verify correct goodbye codes were sent
316+
msg2, ok := goodbyeMessages.Load(p2.BHost.ID().String())
317+
assert.Equal(t, true, ok, "Expected goodbye message to peer 2")
318+
assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg2)
319+
320+
msg3, ok := goodbyeMessages.Load(p3.BHost.ID().String())
321+
assert.Equal(t, true, ok, "Expected goodbye message to peer 3")
322+
assert.Equal(t, p2ptypes.GoodbyeCodeClientShutdown, msg3)
323+
}
324+
325+
func TestService_Stop_TimeoutHandling(t *testing.T) {
326+
p1 := p2ptest.NewTestP2P(t)
327+
p2 := p2ptest.NewTestP2P(t)
328+
p1.Connect(p2)
329+
330+
p1.Peers().Add(nil, p2.BHost.ID(), p2.BHost.Addrs()[0], network.DirOutbound)
331+
p1.Peers().SetConnectionState(p2.BHost.ID(), peers.Connected)
332+
333+
d := dbTest.SetupDB(t)
334+
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
335+
ctx, cancel := context.WithCancel(context.Background())
336+
337+
r := &Service{
338+
cfg: &config{
339+
beaconDB: d,
340+
p2p: p1,
341+
chain: chain,
342+
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
343+
},
344+
ctx: ctx,
345+
cancel: cancel,
346+
rateLimiter: newRateLimiter(p1),
347+
}
348+
349+
// Initialize context map for RPC
350+
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
351+
require.NoError(t, err)
352+
r.ctxMap = ctxMap
353+
354+
// Setup rate limiter for goodbye topic
355+
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
356+
topic := string(pcl)
357+
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
358+
359+
// Don't set up stream handler on p2 to simulate unresponsive peer
360+
361+
// Verify peers are connected before stopping
362+
connectedPeers := r.cfg.p2p.Peers().Connected()
363+
t.Logf("Connected peers before Stop: %d", len(connectedPeers))
364+
365+
start := time.Now()
366+
err = r.Stop()
367+
duration := time.Since(start)
368+
369+
t.Logf("Stop completed in %v", duration)
370+
371+
// Stop should complete successfully even when peers don't respond
372+
assert.NoError(t, err)
373+
// Should not hang - completes quickly when goodbye fails
374+
assert.Equal(t, true, duration < 5*time.Second, "Stop() should not hang when peer is unresponsive")
375+
// Test passes - the timeout behavior is working correctly, goodbye attempts fail quickly
376+
}
377+
378+
func TestService_Stop_ConcurrentGoodbyeMessages(t *testing.T) {
379+
// Test that goodbye messages are sent concurrently, not sequentially
380+
const numPeers = 10
381+
382+
p1 := p2ptest.NewTestP2P(t)
383+
testPeers := make([]*p2ptest.TestP2P, numPeers)
384+
385+
// Create and connect multiple peers
386+
for i := 0; i < numPeers; i++ {
387+
testPeers[i] = p2ptest.NewTestP2P(t)
388+
p1.Connect(testPeers[i])
389+
// Register peer in the peer status
390+
p1.Peers().Add(nil, testPeers[i].BHost.ID(), testPeers[i].BHost.Addrs()[0], network.DirOutbound)
391+
p1.Peers().SetConnectionState(testPeers[i].BHost.ID(), peers.Connected)
392+
}
393+
394+
d := dbTest.SetupDB(t)
395+
chain := &mockChain.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
396+
ctx, cancel := context.WithCancel(context.Background())
397+
398+
r := &Service{
399+
cfg: &config{
400+
beaconDB: d,
401+
p2p: p1,
402+
chain: chain,
403+
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
404+
},
405+
ctx: ctx,
406+
cancel: cancel,
407+
rateLimiter: newRateLimiter(p1),
408+
}
409+
410+
// Initialize context map for RPC
411+
ctxMap, err := ContextByteVersionsForValRoot(chain.ValidatorsRoot)
412+
require.NoError(t, err)
413+
r.ctxMap = ctxMap
414+
415+
// Setup rate limiter for goodbye topic
416+
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz_snappy")
417+
topic := string(pcl)
418+
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, time.Second, false)
419+
420+
// Each peer will have artificial delay in processing goodbye
421+
var wg sync.WaitGroup
422+
wg.Add(numPeers)
423+
424+
for i := 0; i < numPeers; i++ {
425+
idx := i // capture loop variable
426+
testPeers[idx].BHost.SetStreamHandler(pcl, func(stream network.Stream) {
427+
defer wg.Done()
428+
time.Sleep(100 * time.Millisecond) // Artificial delay
429+
out := new(primitives.SSZUint64)
430+
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, out))
431+
require.NoError(t, stream.Close())
432+
})
433+
}
434+
435+
start := time.Now()
436+
err = r.Stop()
437+
duration := time.Since(start)
438+
439+
// If messages were sent sequentially, it would take numPeers * 100ms = 1 second
440+
// If concurrent, should be ~100ms
441+
assert.NoError(t, err)
442+
assert.Equal(t, true, duration < 500*time.Millisecond, "Goodbye messages should be sent concurrently")
443+
444+
require.Equal(t, false, util.WaitTimeout(&wg, 2*time.Second))
445+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
### Changed
2+
3+
- when shutting down the sync service we now send p2p goodbye messages in parallel to maxmimize changes of propogating goodbyes to all peers before an unsafe shutdown.

0 commit comments

Comments
 (0)