diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go index 0378e8ab6ab..73e06e2a0c4 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk.go @@ -190,7 +190,7 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r f.lowestAvaiableBlock.Store(anchorState.Slot()) f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader) - f.dumpBeaconStateOnDisk(anchorState, anchorRoot) + f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true) return f } @@ -313,12 +313,6 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, BodyRoot: bodyRoot, }) - if newState.Slot()%dumpSlotFrequency == 0 { - if err := f.dumpBeaconStateOnDisk(newState, blockRoot); err != nil { - return nil, LogisticError, err - } - } - // Lastly add checkpoints to caches as well. f.currentJustifiedCheckpoints.Store(libcommon.Hash(blockRoot), newState.CurrentJustifiedCheckpoint().Copy()) f.finalizedCheckpoints.Store(libcommon.Hash(blockRoot), newState.FinalizedCheckpoint().Copy()) @@ -361,31 +355,37 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st blocksInTheWay := []*cltypes.SignedBeaconBlock{} // Use the parent root as a reverse iterator. currentIteratorRoot := blockRoot + var copyReferencedState *state.CachingBeaconState + var err error // try and find the point of recconection - for { + for copyReferencedState == nil { block, isSegmentPresent := f.getBlock(currentIteratorRoot) if !isSegmentPresent { // check if it is in the header bHeader, ok := f.GetHeader(currentIteratorRoot) if ok && bHeader.Slot%dumpSlotFrequency == 0 { - break + copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot) + if err != nil { + log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot, "err", err) + copyReferencedState = nil + } + continue } log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot) return nil, nil } if block.Block.Slot%dumpSlotFrequency == 0 { - break + copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot) + if err != nil { + log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot, "err", err) + } + if copyReferencedState != nil { + break + } } blocksInTheWay = append(blocksInTheWay, block) currentIteratorRoot = block.Block.ParentRoot } - copyReferencedState, err := f.readBeaconStateFromDisk(currentIteratorRoot) - if err != nil { - return nil, err - } - if copyReferencedState == nil { - return nil, ErrStateNotFound - } // Traverse the blocks from top to bottom. for i := len(blocksInTheWay) - 1; i >= 0; i-- { diff --git a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go index b11052a9c9f..0a5d58ac0a5 100644 --- a/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go +++ b/cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go @@ -104,7 +104,10 @@ func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *s } // dumpBeaconStateOnDisk dumps a beacon state on disk in ssz snappy format -func (f *forkGraphDisk) dumpBeaconStateOnDisk(bs *state.CachingBeaconState, blockRoot libcommon.Hash) (err error) { +func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *state.CachingBeaconState, forced bool) (err error) { + if !forced && bs.Slot()%dumpSlotFrequency != 0 { + return + } // Truncate and then grow the buffer to the size of the state. encodingSizeSSZ := bs.EncodingSizeSSZ() f.sszBuffer.Grow(encodingSizeSSZ) diff --git a/cl/phase1/forkchoice/fork_graph/interface.go b/cl/phase1/forkchoice/fork_graph/interface.go index 060e1de500b..33345bc52e5 100644 --- a/cl/phase1/forkchoice/fork_graph/interface.go +++ b/cl/phase1/forkchoice/fork_graph/interface.go @@ -54,4 +54,5 @@ type ForkGraph interface { GetPreviousPartecipationIndicies(blockRoot libcommon.Hash) (*solid.BitList, error) GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error) GetCurrentPartecipationIndicies(blockRoot libcommon.Hash) (*solid.BitList, error) + DumpBeaconStateOnDisk(blockRoot libcommon.Hash, state *state.CachingBeaconState, forced bool) error } diff --git a/cl/phase1/forkchoice/forkchoice.go b/cl/phase1/forkchoice/forkchoice.go index 47382f3d0c9..5adf97027e2 100644 --- a/cl/phase1/forkchoice/forkchoice.go +++ b/cl/phase1/forkchoice/forkchoice.go @@ -540,3 +540,11 @@ func (f *ForkChoiceStore) IsHeadOptimistic() bool { latestRoot := headState.LatestBlockHeader().Root return f.optimisticStore.IsOptimistic(latestRoot) } + +func (f *ForkChoiceStore) DumpBeaconStateOnDisk(bs *state.CachingBeaconState) error { + anchorRoot, err := bs.BlockRoot() + if err != nil { + return err + } + return f.forkGraph.DumpBeaconStateOnDisk(anchorRoot, bs, false) +} diff --git a/cl/phase1/network/services/constants.go b/cl/phase1/network/services/constants.go index cb545e9387e..8de33883449 100644 --- a/cl/phase1/network/services/constants.go +++ b/cl/phase1/network/services/constants.go @@ -29,8 +29,8 @@ const ( blobJobsIntervalTick = 5 * time.Millisecond singleAttestationIntervalTick = 10 * time.Millisecond attestationJobsIntervalTick = 100 * time.Millisecond - blockJobExpiry = 7 * time.Minute - blobJobExpiry = 7 * time.Minute + blockJobExpiry = 24 * time.Second + blobJobExpiry = 24 * time.Second attestationJobExpiry = 30 * time.Minute singleAttestationJobExpiry = 6 * time.Second ) diff --git a/cl/phase1/stages/clstages.go b/cl/phase1/stages/clstages.go index 09c0e3cbc4c..5ac55503bf7 100644 --- a/cl/phase1/stages/clstages.go +++ b/cl/phase1/stages/clstages.go @@ -663,6 +663,7 @@ func ConsensusClStages(ctx context.Context, if _, err = cfg.attestationDataProducer.ProduceAndCacheAttestationData(copiedHeadState, copiedHeadState.Slot(), 0); err != nil { logger.Warn("failed to produce and cache attestation data", "err", err) } + // Incement some stuff here preverifiedValidators := cfg.forkChoice.PreverifiedValidator(headState.FinalizedCheckpoint().BlockRoot()) preverifiedHistoricalSummary := cfg.forkChoice.PreverifiedHistoricalSummaries(headState.FinalizedCheckpoint().BlockRoot()) @@ -683,6 +684,10 @@ func ConsensusClStages(ctx context.Context, return fmt.Errorf("failed to hash ssz: %w", err) } + if err := cfg.forkChoice.DumpBeaconStateOnDisk(headState); err != nil { + return fmt.Errorf("failed to dump beacon state on disk: %w", err) + } + headEpoch := headSlot / cfg.beaconCfg.SlotsPerEpoch previous_duty_dependent_root, err := headState.GetBlockRootAtSlot((headEpoch-1)*cfg.beaconCfg.SlotsPerEpoch - 1) if err != nil { diff --git a/cl/sentinel/discovery.go b/cl/sentinel/discovery.go index 2fd0e43197e..4bf27821202 100644 --- a/cl/sentinel/discovery.go +++ b/cl/sentinel/discovery.go @@ -118,11 +118,9 @@ func (s *Sentinel) listenForPeers() { continue } - go func(peerInfo *peer.AddrInfo) { - if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil { - log.Trace("[Sentinel] Could not connect with peer", "err", err) - } - }(peerInfo) + if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil { + log.Trace("[Sentinel] Could not connect with peer", "err", err) + } } } diff --git a/cl/sentinel/gossip.go b/cl/sentinel/gossip.go index e27ad0d35cb..6526c8201d6 100644 --- a/cl/sentinel/gossip.go +++ b/cl/sentinel/gossip.go @@ -208,11 +208,10 @@ func (s *Sentinel) forkWatcher() { s.subManager.subscriptions.Range(func(key, value interface{}) bool { sub := value.(*GossipSubscription) s.subManager.unsubscribe(key.(string)) - newSub, err := s.SubscribeGossip(sub.gossip_topic, sub.expiration.Load().(time.Time)) + _, err := s.SubscribeGossip(sub.gossip_topic, sub.expiration.Load().(time.Time)) if err != nil { log.Warn("[Gossip] Failed to resubscribe to topic", "err", err) } - newSub.Listen() return true }) prevDigest = digest @@ -495,6 +494,24 @@ func (g *GossipManager) Close() { }) } +func (g *GossipManager) Start(ctx context.Context) { + go func() { + checkingInterval := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + return + case <-checkingInterval.C: + g.subscriptions.Range(func(key, value any) bool { + sub := value.(*GossipSubscription) + sub.checkIfTopicNeedsToEnabledOrDisabled() + return true + }) + } + } + }() +} + // GossipSubscription abstracts a gossip subscription to write decoded structs. type GossipSubscription struct { gossip_topic GossipTopic @@ -516,42 +533,32 @@ type GossipSubscription struct { closeOnce sync.Once } -func (sub *GossipSubscription) Listen() { - go func() { - var err error - checkingInterval := time.NewTicker(100 * time.Millisecond) - for { - select { - case <-sub.ctx.Done(): - return - case <-checkingInterval.C: - expirationTime := sub.expiration.Load().(time.Time) - if sub.subscribed.Load() && time.Now().After(expirationTime) { - sub.stopCh <- struct{}{} - sub.topic.Close() - sub.subscribed.Store(false) - log.Info("[Gossip] Unsubscribed from topic", "topic", sub.sub.Topic()) - sub.s.updateENROnSubscription(sub.sub.Topic(), false) - continue - } - if !sub.subscribed.Load() && time.Now().Before(expirationTime) { - sub.stopCh = make(chan struct{}, 3) - sub.sub, err = sub.topic.Subscribe() - if err != nil { - log.Warn("[Gossip] failed to begin topic subscription", "err", err) - time.Sleep(30 * time.Second) - continue - } - var sctx context.Context - sctx, sub.cf = context.WithCancel(sub.ctx) - go sub.run(sctx, sub.sub, sub.sub.Topic()) - sub.subscribed.Store(true) - sub.s.updateENROnSubscription(sub.sub.Topic(), true) - log.Info("[Gossip] Subscribed to topic", "topic", sub.sub.Topic()) - } - } +func (sub *GossipSubscription) checkIfTopicNeedsToEnabledOrDisabled() { + var err error + expirationTime := sub.expiration.Load().(time.Time) + if sub.subscribed.Load() && time.Now().After(expirationTime) { + sub.stopCh <- struct{}{} + sub.topic.Close() + sub.subscribed.Store(false) + log.Info("[Gossip] Unsubscribed from topic", "topic", sub.sub.Topic()) + sub.s.updateENROnSubscription(sub.sub.Topic(), false) + return + } + if !sub.subscribed.Load() && time.Now().Before(expirationTime) { + sub.stopCh = make(chan struct{}, 3) + sub.sub, err = sub.topic.Subscribe() + if err != nil { + log.Warn("[Gossip] failed to begin topic subscription", "err", err) + return } - }() + var sctx context.Context + sctx, sub.cf = context.WithCancel(sub.ctx) + go sub.run(sctx, sub.sub, sub.sub.Topic()) + sub.subscribed.Store(true) + sub.s.updateENROnSubscription(sub.sub.Topic(), true) + log.Info("[Gossip] Subscribed to topic", "topic", sub.sub.Topic()) + } + } func (sub *GossipSubscription) OverwriteSubscriptionExpiry(expiry time.Time) { diff --git a/cl/sentinel/sentinel.go b/cl/sentinel/sentinel.go index 5b77c60b322..e2bfe582f9e 100644 --- a/cl/sentinel/sentinel.go +++ b/cl/sentinel/sentinel.go @@ -303,6 +303,7 @@ func (s *Sentinel) Start() error { }, }) s.subManager = NewGossipManager(s.ctx) + s.subManager.Start(s.ctx) go s.listenForPeers() go s.forkWatcher() diff --git a/cl/sentinel/sentinel_gossip_test.go b/cl/sentinel/sentinel_gossip_test.go index 4b98cb8cf00..529ac23e12c 100644 --- a/cl/sentinel/sentinel_gossip_test.go +++ b/cl/sentinel/sentinel_gossip_test.go @@ -88,12 +88,9 @@ func TestSentinelGossipOnHardFork(t *testing.T) { require.NoError(t, err) defer sub1.Close() - sub1.Listen() - sub2, err := sentinel2.SubscribeGossip(BeaconBlockSsz, time.Unix(0, math.MaxInt64)) require.NoError(t, err) defer sub2.Close() - sub2.Listen() time.Sleep(200 * time.Millisecond) err = h.Connect(ctx, peer.AddrInfo{ diff --git a/cl/sentinel/service/start.go b/cl/sentinel/service/start.go index e2fc2a7a5f9..a6e7e5e9334 100644 --- a/cl/sentinel/service/start.go +++ b/cl/sentinel/service/start.go @@ -131,12 +131,10 @@ func createSentinel( } // now lets separately connect to the gossip topics. this joins the room - subscriber, err := sent.SubscribeGossip(v, getExpirationForTopic(v.Name)) // Listen forever. + _, err := sent.SubscribeGossip(v, getExpirationForTopic(v.Name)) // Listen forever. if err != nil { logger.Error("[Sentinel] failed to start sentinel", "err", err) } - // actually start the subscription, aka listening and sending packets to the sentinel recv channel - subscriber.Listen() } return sent, nil } diff --git a/cl/transition/impl/eth2/operations.go b/cl/transition/impl/eth2/operations.go index beebf702b21..0f0bf3c992d 100644 --- a/cl/transition/impl/eth2/operations.go +++ b/cl/transition/impl/eth2/operations.go @@ -816,11 +816,8 @@ func verifyAttestations( attestingIndicies [][]uint64, ) (bool, error) { indexedAttestations := make([]*cltypes.IndexedAttestation, 0, attestations.Len()) - commonBuffer := make([]byte, 8*2048) attestations.Range(func(idx int, a *solid.Attestation, _ int) bool { idxAttestations := state.GetIndexedAttestation(a, attestingIndicies[idx]) - idxAttestations.AttestingIndices.SetReusableHashBuffer(commonBuffer) - idxAttestations.HashSSZ() indexedAttestations = append(indexedAttestations, idxAttestations) return true })