diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 525434da1e2d3..c45c300a44c8c 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -679,7 +679,7 @@ func (sys *System) newMockNetPeer() (host.Host, error) { _ = ps.AddPubKey(p, sk.GetPublic()) ds := sync.MutexWrap(ds.NewMapDatastore()) - eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds) + eps, err := store.NewExtendedPeerstore(context.Background(), log.Root(), clock.SystemClock, ps, ds, 24*time.Hour) if err != nil { return nil, err } diff --git a/op-node/p2p/host.go b/op-node/p2p/host.go index 9d136e9ce078e..628f430c51b6f 100644 --- a/op-node/p2p/host.go +++ b/op-node/p2p/host.go @@ -141,7 +141,8 @@ func (conf *Config) Host(log log.Logger, reporter metrics.Reporter, metrics Host return nil, fmt.Errorf("failed to open peerstore: %w", err) } - ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store) + peerScoreParams := conf.PeerScoringParams() + ps, err := store.NewExtendedPeerstore(context.Background(), log, clock.SystemClock, basePs, conf.Store, peerScoreParams.RetainScore) if err != nil { return nil, fmt.Errorf("failed to open extended peerstore: %w", err) } diff --git a/op-node/p2p/peer_scores_test.go b/op-node/p2p/peer_scores_test.go index 9d08bd6cc3763..06c118e05a464 100644 --- a/op-node/p2p/peer_scores_test.go +++ b/op-node/p2p/peer_scores_test.go @@ -76,7 +76,7 @@ func getNetHosts(testSuite *PeerScoresTestSuite, ctx context.Context, n int) []h log := testlog.Logger(testSuite.T(), log.LvlError) for i := 0; i < n; i++ { swarm := tswarm.GenSwarm(testSuite.T()) - eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore())) + eps, err := store.NewExtendedPeerstore(ctx, log, clock.SystemClock, swarm.Peerstore(), sync.MutexWrap(ds.NewMapDatastore()), 1*time.Hour) netw := &customPeerstoreNetwork{swarm, eps} require.NoError(testSuite.T(), err) h := bhost.NewBlankHost(netw) @@ -99,7 +99,7 @@ func newGossipSubs(testSuite *PeerScoresTestSuite, ctx context.Context, hosts [] dataStore := sync.MutexWrap(ds.NewMapDatastore()) peerStore, err := pstoreds.NewPeerstore(context.Background(), dataStore, pstoreds.DefaultOpts()) require.NoError(testSuite.T(), err) - extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore) + extPeerStore, err := store.NewExtendedPeerstore(context.Background(), logger, clock.SystemClock, peerStore, dataStore, 1*time.Hour) require.NoError(testSuite.T(), err) scorer := NewScorer( diff --git a/op-node/p2p/store/extended.go b/op-node/p2p/store/extended.go index 2137c9dd94291..89e3be8581bd9 100644 --- a/op-node/p2p/store/extended.go +++ b/op-node/p2p/store/extended.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/ethereum-optimism/optimism/op-service/clock" "github.com/ethereum/go-ethereum/log" @@ -19,12 +20,12 @@ type extendedStore struct { *ipBanBook } -func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching) (ExtendedPeerstore, error) { +func NewExtendedPeerstore(ctx context.Context, logger log.Logger, clock clock.Clock, ps peerstore.Peerstore, store ds.Batching, scoreRetention time.Duration) (ExtendedPeerstore, error) { cab, ok := peerstore.GetCertifiedAddrBook(ps) if !ok { return nil, errors.New("peerstore should also be a certified address book") } - sb, err := newScoreBook(ctx, logger, clock, store) + sb, err := newScoreBook(ctx, logger, clock, store, scoreRetention) if err != nil { return nil, fmt.Errorf("create scorebook: %w", err) } diff --git a/op-node/p2p/store/records_book.go b/op-node/p2p/store/records_book.go index b8451a8fd1598..21e016374a3af 100644 --- a/op-node/p2p/store/records_book.go +++ b/op-node/p2p/store/records_book.go @@ -102,6 +102,9 @@ func (d *recordsBook[K, V]) deleteRecord(key K) error { func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { if val, ok := d.cache.Get(key); ok { + if d.hasExpired(val) { + return v, UnknownRecordErr + } return val, nil } data, err := d.store.Get(d.ctx, d.dsKey(key)) @@ -114,6 +117,9 @@ func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) { if err := v.UnmarshalBinary(data); err != nil { return v, fmt.Errorf("invalid value for key %v: %w", key, err) } + if d.hasExpired(v) { + return v, UnknownRecordErr + } d.cache.Add(key, v) return v, nil } @@ -142,9 +148,9 @@ func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) error { } // prune deletes entries from the store that are older than the configured prune expiration. -// Note that the expiry period is not a strict TTL. Entries that are eligible for deletion may still be present -// either because the prune function hasn't yet run or because they are still preserved in the in-memory cache after -// having been deleted from the database. +// Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or +// because they are still preserved in the in-memory cache after having been deleted from the database. +// Such expired entries are filtered out in getRecord func (d *recordsBook[K, V]) prune() error { results, err := d.store.Query(d.ctx, query.Query{ Prefix: d.dsBaseKey.String(), @@ -168,7 +174,7 @@ func (d *recordsBook[K, V]) prune() error { if err := v.UnmarshalBinary(result.Value); err != nil { return err } - if v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) { + if d.hasExpired(v) { if pending > maxPruneBatchSize { if err := batch.Commit(d.ctx); err != nil { return err @@ -191,6 +197,10 @@ func (d *recordsBook[K, V]) prune() error { return nil } +func (d *recordsBook[K, V]) hasExpired(v V) bool { + return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now()) +} + func (d *recordsBook[K, V]) Close() { d.cancelFn() d.bgTasks.Wait() diff --git a/op-node/p2p/store/scorebook.go b/op-node/p2p/store/scorebook.go index 90d1a67789931..af68662f55d4a 100644 --- a/op-node/p2p/store/scorebook.go +++ b/op-node/p2p/store/scorebook.go @@ -12,8 +12,7 @@ import ( ) const ( - scoreCacheSize = 100 - scoreRecordExpiryPeriod = 24 * time.Hour + scoreCacheSize = 100 ) var scoresBase = ds.NewKey("/peers/scores") @@ -56,8 +55,8 @@ func peerIDKey(id peer.ID) ds.Key { return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(id))) } -func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching) (*scoreBook, error) { - book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, scoreRecordExpiryPeriod, scoresBase, newScoreRecord, peerIDKey) +func newScoreBook(ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, retain time.Duration) (*scoreBook, error) { + book, err := newRecordsBook[peer.ID, *scoreRecord](ctx, logger, clock, store, scoreCacheSize, retain, scoresBase, newScoreRecord, peerIDKey) if err != nil { return nil, err } diff --git a/op-node/p2p/store/scorebook_test.go b/op-node/p2p/store/scorebook_test.go index a94b7a446e9d9..7f66aa741ccc2 100644 --- a/op-node/p2p/store/scorebook_test.go +++ b/op-node/p2p/store/scorebook_test.go @@ -81,7 +81,7 @@ func TestPrune(t *testing.T) { logger := testlog.Logger(t, log.LvlInfo) store := sync.MutexWrap(ds.NewMapDatastore()) clock := clock.NewDeterministicClock(time.UnixMilli(1000)) - book, err := newScoreBook(ctx, logger, clock, store) + book, err := newScoreBook(ctx, logger, clock, store, 24*time.Hour) require.NoError(t, err) hasScoreRecorded := func(id peer.ID) bool { @@ -135,7 +135,7 @@ func TestPruneMultipleBatches(t *testing.T) { defer cancelFunc() logger := testlog.Logger(t, log.LvlInfo) clock := clock.NewDeterministicClock(time.UnixMilli(1000)) - book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore())) + book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), 24*time.Hour) require.NoError(t, err) hasScoreRecorded := func(id peer.ID) bool { @@ -159,6 +159,31 @@ func TestPruneMultipleBatches(t *testing.T) { } } +// Check that scores that are eligible for pruning are not returned, even if they haven't yet been removed +func TestIgnoreOutdatedScores(t *testing.T) { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + logger := testlog.Logger(t, log.LvlInfo) + clock := clock.NewDeterministicClock(time.UnixMilli(1000)) + retentionPeriod := 24 * time.Hour + book, err := newScoreBook(ctx, logger, clock, sync.MutexWrap(ds.NewMapDatastore()), retentionPeriod) + require.NoError(t, err) + + require.NoError(t, book.SetScore("a", &GossipScores{Total: 123.45})) + clock.AdvanceTime(retentionPeriod + 1) + + // Not available from cache + scores, err := book.GetPeerScores("a") + require.NoError(t, err) + require.Equal(t, scores, PeerScores{}) + + book.book.cache.Purge() + // Not available from disk + scores, err = book.GetPeerScores("a") + require.NoError(t, err) + require.Equal(t, scores, PeerScores{}) +} + func assertPeerScores(t *testing.T, store ExtendedPeerstore, id peer.ID, expected PeerScores) { result, err := store.GetPeerScores(id) require.NoError(t, err) @@ -174,8 +199,8 @@ func createPeerstoreWithBacking(t *testing.T, store *sync.MutexDatastore) Extend ps, err := pstoreds.NewPeerstore(context.Background(), store, pstoreds.DefaultOpts()) require.NoError(t, err, "Failed to create peerstore") logger := testlog.Logger(t, log.LvlInfo) - clock := clock.NewDeterministicClock(time.UnixMilli(100)) - eps, err := NewExtendedPeerstore(context.Background(), logger, clock, ps, store) + c := clock.NewDeterministicClock(time.UnixMilli(100)) + eps, err := NewExtendedPeerstore(context.Background(), logger, c, ps, store, 24*time.Hour) require.NoError(t, err) t.Cleanup(func() { _ = eps.Close()