From cdac097002cff47f79190208a9b098144c2a7c7b Mon Sep 17 00:00:00 2001 From: protolambda Date: Thu, 16 Mar 2023 15:20:36 +0100 Subject: [PATCH 1/6] op-node: generalize and improve alt-sync --- op-e2e/migration_test.go | 1 + op-e2e/setup.go | 14 ++- op-e2e/system_test.go | 26 +++--- op-node/flags/flags.go | 8 ++ op-node/node/client.go | 42 ++++----- op-node/node/config.go | 3 + op-node/node/node.go | 56 +++++++----- op-node/rollup/derive/engine_queue.go | 7 +- op-node/rollup/driver/driver.go | 15 +++- op-node/rollup/driver/state.go | 67 +++++++------- op-node/service.go | 1 + op-node/sources/sync_client.go | 120 +++++++++++++++++++------- 12 files changed, 235 insertions(+), 125 deletions(-) diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index 69bf616e65065..1def369c53a37 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -277,6 +277,7 @@ func TestMigration(t *testing.T) { L2EngineAddr: gethNode.HTTPAuthEndpoint(), L2EngineJWTSecret: testingJWTSecret, }, + L2Sync: &node.PreparedL2SyncEndpoint{Client: nil, TrustRPC: false}, Driver: driver.Config{ VerifierConfDepth: 0, SequencerConfDepth: 0, diff --git a/op-e2e/setup.go b/op-e2e/setup.go index fea710838265f..be83529142de4 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -193,6 +193,9 @@ type SystemConfig struct { // If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet. NonFinalizedProposals bool + + // Explicitly disable batcher, for tests that rely on unsafe L2 payloads + DisableBatcher bool } type System struct { @@ -417,6 +420,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { L2EngineAddr: l2EndpointConfig, L2EngineJWTSecret: cfg.JWTSecret, } + rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{ + Client: nil, + TrustRPC: false, + } } // Geth Clients @@ -606,8 +613,11 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { return nil, fmt.Errorf("failed to setup batch submitter: %w", err) } - if err := sys.BatchSubmitter.Start(); err != nil { - return nil, fmt.Errorf("unable to start batch submitter: %w", err) + // Batcher may be enabled later + if !sys.cfg.DisableBatcher { + if err := sys.BatchSubmitter.Start(); err != nil { + return nil, fmt.Errorf("unable to start batch submitter: %w", err) + } } return sys, nil diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 2ac82bbd9cd82..1e4170712693c 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -649,7 +649,7 @@ func TestSystemMockP2P(t *testing.T) { require.Contains(t, received, receiptVerif.BlockHash) } -// TestSystemMockP2P sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that +// TestSystemRPCAltSync sets up a L1 Geth node, a rollup node, and a L2 geth node and then confirms that // the nodes can sync L2 blocks before they are confirmed on L1. // // Test steps: @@ -660,24 +660,28 @@ func TestSystemMockP2P(t *testing.T) { // 6. Wait for the RPC sync method to grab the block from the sequencer over RPC and insert it into the verifier's unsafe chain. // 7. Wait for the verifier to sync the unsafe chain into the safe chain. // 8. Verify that the TX is included in the verifier's safe chain. -func TestSystemMockAltSync(t *testing.T) { +func TestSystemRPCAltSync(t *testing.T) { parallel(t) if !verboseGethNodes { log.Root().SetHandler(log.DiscardHandler()) } cfg := DefaultSystemConfig(t) - // slow down L1 blocks so we can see the L2 blocks arrive well before the L1 blocks do. - // Keep the seq window small so the L2 chain is started quick - cfg.DeployConfig.L1BlockTime = 10 + // the default is nil, but this may change in the future. + // This test must ensure the blocks are not synced via Gossip, but instead via the alt RPC based sync. + cfg.P2PTopology = nil + // Disable batcher, so there will not be any L1 data to sync from + cfg.DisableBatcher = true - var published, received []common.Hash + var published, received []string seqTracer, verifTracer := new(FnTracer), new(FnTracer) + // The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) { - published = append(published, payload.BlockHash) + published = append(published, payload.ID().String()) } + // Blocks are now received via the RPC based alt-sync method verifTracer.OnUnsafeL2PayloadFn = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) { - received = append(received, payload.BlockHash) + received = append(received, payload.ID().String()) } cfg.Nodes["sequencer"].Tracer = seqTracer cfg.Nodes["verifier"].Tracer = verifTracer @@ -687,8 +691,8 @@ func TestSystemMockAltSync(t *testing.T) { role: "sequencer", action: func(sCfg *SystemConfig, system *System) { rpc, _ := system.Nodes["sequencer"].Attach() // never errors - cfg.Nodes["verifier"].L2Sync = &rollupNode.L2SyncRPCConfig{ - Rpc: client.NewBaseRPCClient(rpc), + cfg.Nodes["verifier"].L2Sync = &rollupNode.PreparedL2SyncEndpoint{ + Client: client.NewBaseRPCClient(rpc), } }, }) @@ -726,7 +730,7 @@ func TestSystemMockAltSync(t *testing.T) { require.Equal(t, receiptSeq, receiptVerif) // Verify that the tx was received via RPC sync (P2P is disabled) - require.Contains(t, received, receiptVerif.BlockHash) + require.Contains(t, received, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String()) // Verify that everything that was received was published require.GreaterOrEqual(t, len(published), len(received)) diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 44f23ac3c9591..9c17cd762e41b 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -175,6 +175,13 @@ var ( EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"), Required: false, } + BackupL2UnsafeSyncRPCTrustRPC = cli.StringFlag{ + Name: "l2.backup-unsafe-sync-rpc.trustrpc", + Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." + + "This does not include checks if the blockhash is part of the canonical chain.", + EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), + Required: false, + } ) var requiredFlags = []cli.Flag{ @@ -207,6 +214,7 @@ var optionalFlags = []cli.Flag{ HeartbeatMonikerFlag, HeartbeatURLFlag, BackupL2UnsafeSyncRPC, + BackupL2UnsafeSyncRPCTrustRPC, } // Flags contains the list of configuration options available to the binary. diff --git a/op-node/node/client.go b/op-node/node/client.go index d83a258fb1d29..73a2f7e4654ec 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -20,7 +20,9 @@ type L2EndpointSetup interface { } type L2SyncEndpointSetup interface { - Setup(ctx context.Context, log log.Logger) (cl client.RPC, err error) + // Setup a RPC client to another L2 node to sync L2 blocks from. + // It may return a nil client with nil error if RPC based sync is not enabled. + Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) Check() error } @@ -82,45 +84,45 @@ func (p *PreparedL2Endpoints) Setup(ctx context.Context, log log.Logger) (client // L2SyncEndpointConfig contains configuration for the fallback sync endpoint type L2SyncEndpointConfig struct { - // Address of the L2 RPC to use for backup sync + // Address of the L2 RPC to use for backup sync, may be empty if RPC alt-sync is disabled. L2NodeAddr string + TrustRPC bool } var _ L2SyncEndpointSetup = (*L2SyncEndpointConfig)(nil) -func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { +// Setup creates an RPC client to sync from. +// It will return nil without error if no sync method is configured. +func (cfg *L2SyncEndpointConfig) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { + if cfg.L2NodeAddr == "" { + return nil, false, nil + } l2Node, err := client.NewRPC(ctx, log, cfg.L2NodeAddr) if err != nil { - return nil, err + return nil, false, err } - return l2Node, nil + return l2Node, cfg.TrustRPC, nil } func (cfg *L2SyncEndpointConfig) Check() error { - if cfg.L2NodeAddr == "" { - return errors.New("empty L2 Node Address") - } - + // empty addr is valid, as it is optional. return nil } -type L2SyncRPCConfig struct { - // RPC endpoint to use for syncing - Rpc client.RPC +type PreparedL2SyncEndpoint struct { + // RPC endpoint to use for syncing, may be nil if RPC alt-sync is disabled. + Client client.RPC + TrustRPC bool } -var _ L2SyncEndpointSetup = (*L2SyncRPCConfig)(nil) +var _ L2SyncEndpointSetup = (*PreparedL2SyncEndpoint)(nil) -func (cfg *L2SyncRPCConfig) Setup(ctx context.Context, log log.Logger) (client.RPC, error) { - return cfg.Rpc, nil +func (cfg *PreparedL2SyncEndpoint) Setup(ctx context.Context, log log.Logger) (cl client.RPC, trust bool, err error) { + return cfg.Client, cfg.TrustRPC, nil } -func (cfg *L2SyncRPCConfig) Check() error { - if cfg.Rpc == nil { - return errors.New("rpc cannot be nil") - } - +func (cfg *PreparedL2SyncEndpoint) Check() error { return nil } diff --git a/op-node/node/config.go b/op-node/node/config.go index 8d95f79f360af..37533a868aa04 100644 --- a/op-node/node/config.go +++ b/op-node/node/config.go @@ -80,6 +80,9 @@ func (cfg *Config) Check() error { if err := cfg.L2.Check(); err != nil { return fmt.Errorf("l2 endpoint config error: %w", err) } + if err := cfg.L2Sync.Check(); err != nil { + return fmt.Errorf("sync config error: %w", err) + } if err := cfg.Rollup.Check(); err != nil { return fmt.Errorf("rollup config error: %w", err) } diff --git a/op-node/node/node.go b/op-node/node/node.go index 2c2e43521f692..8bfb4a68e1a0b 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -33,6 +33,7 @@ type OpNode struct { l1Source *sources.L1Client // L1 Client to fetch data from l2Driver *driver.Driver // L2 Engine to Sync l2Source *sources.EngineClient // L2 Execution Engine RPC bindings + rpcSync *sources.SyncClient // Alt-sync RPC client, optional (may be nil) server *rpcServer // RPC server hosting the rollup-node API p2pNode *p2p.NodeP2P // P2P node functionality p2pSigner p2p.Signer // p2p gogssip application messages will be signed with this signer @@ -86,6 +87,9 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger) if err := n.initL2(ctx, cfg, snapshotLog); err != nil { return err } + if err := n.initRPCSync(ctx, cfg); err != nil { + return err + } if err := n.initP2PSigner(ctx, cfg); err != nil { return err } @@ -197,29 +201,28 @@ func (n *OpNode) initL2(ctx context.Context, cfg *Config, snapshotLog log.Logger return err } - var syncClient *sources.SyncClient - // If the L2 sync config is present, use it to create a sync client - if cfg.L2Sync != nil { - if err := cfg.L2Sync.Check(); err != nil { - log.Info("L2 sync config is not present, skipping L2 sync client setup", "err", err) - } else { - rpcSyncClient, err := cfg.L2Sync.Setup(ctx, n.log) - if err != nil { - return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err) - } + n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, n, n, n.log, snapshotLog, n.metrics) - // The sync client's RPC is always trusted - config := sources.SyncClientDefaultConfig(&cfg.Rollup, true) + return nil +} - syncClient, err = sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) - if err != nil { - return fmt.Errorf("failed to create sync client: %w", err) - } - } +func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error { + rpcSyncClient, trustRPC, err := cfg.L2Sync.Setup(ctx, n.log) + if err != nil { + return fmt.Errorf("failed to setup L2 execution-engine RPC client for backup sync: %w", err) + } + if rpcSyncClient == nil { // if no RPC client is configured to sync from, then don't add the RPC sync client + return nil } - n.l2Driver = driver.NewDriver(&cfg.Driver, &cfg.Rollup, n.l2Source, n.l1Source, syncClient, n, n.log, snapshotLog, n.metrics) + // The sync client's RPC is always trusted + config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC) + syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) + if err != nil { + return fmt.Errorf("failed to create sync client: %w", err) + } + n.rpcSync = syncClient return nil } @@ -292,11 +295,12 @@ func (n *OpNode) Start(ctx context.Context) error { } // If the backup unsafe sync client is enabled, start its event loop - if n.l2Driver.L2SyncCl != nil { - if err := n.l2Driver.L2SyncCl.Start(); err != nil { + if n.rpcSync != nil { + if err := n.rpcSync.Start(); err != nil { n.log.Error("Could not start the backup sync client", "err", err) return err } + n.log.Info("Started L2-RPC sync service") } return nil @@ -375,6 +379,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e return nil } +func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error { + if n.rpcSync != nil { + return n.rpcSync.RequestL2Range(ctx, start, end) + } + n.log.Debug("ignoring request to sync L2 range, no sync method available") + return nil +} + func (n *OpNode) P2P() p2p.Node { return n.p2pNode } @@ -413,8 +425,8 @@ func (n *OpNode) Close() error { } // If the L2 sync client is present & running, close it. - if n.l2Driver.L2SyncCl != nil { - if err := n.l2Driver.L2SyncCl.Close(); err != nil { + if n.rpcSync != nil { + if err := n.rpcSync.Close(); err != nil { result = multierror.Append(result, fmt.Errorf("failed to close L2 engine backup sync client cleanly: %w", err)) } } diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 91a26cc46bdf7..a9d49abc58364 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -682,7 +682,8 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System return io.EOF } -// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. +// GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end) +// of the gap between the tip of the unsafe priority queue and the unsafe head. // If there is no gap, the difference between end and start will be 0. func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) { // The start of the gap is always the unsafe head + 1 @@ -691,9 +692,11 @@ func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, e // If the priority queue is empty, the end is the first block number at the top of the priority queue // Otherwise, the end is the expected block number if first := eq.unsafePayloads.Peek(); first != nil { + // Don't include the payload we already have in the sync range end = first.ID().Number } else { - end = expectedNumber + // Include the expected payload in the sync range + end = expectedNumber + 1 } return start, end diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index d94eada3cd347..f3c927e1a15ef 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -10,7 +10,6 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - "github.com/ethereum-optimism/optimism/op-node/sources" ) type Metrics interface { @@ -82,8 +81,18 @@ type Network interface { PublishL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error } +type AltSync interface { + // RequestL2Range informs the sync source that the given range of L2 blocks is missing, + // and should be retrieved from any available alternative syncing source. + // The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method. + // The latest requested range should always take priority over previous requests. + // There may be overlaps in requested ranges. + // An error may be returned if the scheduling fails immediately, e.g. a context timeout. + RequestL2Range(ctx context.Context, start, end uint64) error +} + // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. -func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, syncClient *sources.SyncClient, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { +func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, altSync AltSync, network Network, log log.Logger, snapshotLog log.Logger, metrics Metrics) *Driver { l1State := NewL1State(log, metrics) sequencerConfDepth := NewConfDepth(driverCfg.SequencerConfDepth, l1State.L1Head, l1) findL1Origin := NewL1OriginSelector(log, cfg, sequencerConfDepth) @@ -115,6 +124,6 @@ func NewDriver(driverCfg *Config, cfg *rollup.Config, l2 L2Chain, l1 L1Chain, sy l1SafeSig: make(chan eth.L1BlockRef, 10), l1FinalizedSig: make(chan eth.L1BlockRef, 10), unsafeL2Payloads: make(chan *eth.ExecutionPayload, 10), - L2SyncCl: syncClient, + altSync: altSync, } } diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 1387c9136b192..87a1b5a2bc92a 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -16,7 +16,6 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-service/backoff" ) @@ -64,8 +63,8 @@ type Driver struct { l1SafeSig chan eth.L1BlockRef l1FinalizedSig chan eth.L1BlockRef - // Backup unsafe sync client - L2SyncCl *sources.SyncClient + // Interface to signal the L2 block range to sync. + altSync AltSync // L2 Signals: @@ -200,11 +199,12 @@ func (s *Driver) eventLoop() { sequencerTimer.Reset(delay) } - // Create a ticker to check if there is a gap in the engine queue every 15 seconds - // If there is, we send requests to the backup RPC to retrieve the missing payloads - // and add them to the unsafe queue. - altSyncTicker := time.NewTicker(15 * time.Second) + // Create a ticker to check if there is a gap in the engine queue, whenever + // If there is, we send requests to sync source to retrieve the missing payloads. + syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2 + altSyncTicker := time.NewTicker(syncCheckInterval) defer altSyncTicker.Stop() + lastUnsafeL2 := s.derivation.UnsafeL2Head() for { // If we are sequencing, and the L1 state is ready, update the trigger for the next sequencer action. @@ -220,6 +220,13 @@ func (s *Driver) eventLoop() { sequencerCh = nil } + // If the engine is not ready, or if the L2 head is actively changing, then reset the alt-sync: + // there is no need to request L2 blocks when we are syncing already. + if head := s.derivation.UnsafeL2Head(); head != lastUnsafeL2 || !s.derivation.EngineReady() { + lastUnsafeL2 = head + altSyncTicker.Reset(syncCheckInterval) + } + select { case <-sequencerCh: payload, err := s.sequencer.RunNextSequencerAction(ctx) @@ -237,10 +244,12 @@ func (s *Driver) eventLoop() { } planSequencerAction() // schedule the next sequencer action to keep the sequencing looping case <-altSyncTicker.C: - // Check if there is a gap in the current unsafe payload queue. If there is, attempt to fetch - // missing payloads from the backup RPC (if it is configured). - if s.L2SyncCl != nil { - s.checkForGapInUnsafeQueue(ctx) + // Check if there is a gap in the current unsafe payload queue. + ctx, cancel := context.WithTimeout(ctx, time.Second*2) + err := s.checkForGapInUnsafeQueue(ctx) + cancel() + if err != nil { + s.log.Warn("failed to check for unsafe L2 blocks to sync", "err", err) } case payload := <-s.unsafeL2Payloads: s.snapshot("New unsafe payload") @@ -462,35 +471,29 @@ type hashAndErrorChannel struct { err chan error } -// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from the backup RPC. -// WARNING: The sync client's attempt to retrieve the missing payloads is not guaranteed to succeed, and it will fail silently (besides -// emitting warning logs) if the requests fail. -func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) { +// checkForGapInUnsafeQueue checks if there is a gap in the unsafe queue and attempts to retrieve the missing payloads from an alt-sync method. +// WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved. +// Results are received through OnUnsafeL2Payload. +func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error { // subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that // difference by the block time to get the expected L2 block number at the current time. If the // unsafe head does not have this block number, then there is a gap in the queue. wallClock := uint64(time.Now().Unix()) genesisTimestamp := s.config.Genesis.L2Time + if wallClock < genesisTimestamp { + s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp) + return nil + } wallClockGenesisDiff := wallClock - genesisTimestamp - expectedL2Block := wallClockGenesisDiff / s.config.BlockTime + // Note: round down, we should not request blocks into the future. + blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime + expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block) - size := end - start - // Check if there is a gap between the unsafe head and the expected L2 block number at the current time. - if size > 0 { - s.log.Warn("Gap in payload queue tip and expected unsafe chain detected", "start", start, "end", end, "size", size) - s.log.Info("Attempting to fetch missing payloads from backup RPC", "start", start, "end", end, "size", size) - - // Attempt to fetch the missing payloads from the backup unsafe sync RPC concurrently. - // Concurrent requests are safe here due to the engine queue being a priority queue. - for blockNumber := start; blockNumber <= end; blockNumber++ { - select { - case s.L2SyncCl.FetchUnsafeBlock <- blockNumber: - // Do nothing- the block number was successfully sent into the channel - default: - return // If the channel is full, return and wait for the next iteration of the event loop - } - } + if end > start { + s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start) + return s.altSync.RequestL2Range(ctx, start, end) } + return nil } diff --git a/op-node/service.go b/op-node/service.go index 48848babc76b5..0656affac6f70 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -136,6 +136,7 @@ func NewL2EndpointConfig(ctx *cli.Context, log log.Logger) (*node.L2EndpointConf func NewL2SyncEndpointConfig(ctx *cli.Context) *node.L2SyncEndpointConfig { return &node.L2SyncEndpointConfig{ L2NodeAddr: ctx.GlobalString(flags.BackupL2UnsafeSyncRPC.Name), + TrustRPC: ctx.GlobalBool(flags.BackupL2UnsafeSyncRPCTrustRPC.Name), } } diff --git a/op-node/sources/sync_client.go b/op-node/sources/sync_client.go index 6c5aa6dccf9db..b6e41c05d2d9c 100644 --- a/op-node/sources/sync_client.go +++ b/op-node/sources/sync_client.go @@ -3,7 +3,10 @@ package sources import ( "context" "errors" + "fmt" + "io" "sync" + "time" "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -18,23 +21,34 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not // RpcSyncPeer is a mock PeerID for the RPC sync client. var RpcSyncPeer peer.ID = "ALT_RPC_SYNC" +// Limit the maximum range to request at a time. +const maxRangePerWorker = 10 + type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error -type SyncClientInterface interface { +type syncRequest struct { + start, end uint64 +} + +type RPCSync interface { + io.Closer + // Start starts an additional worker syncing job Start() error - Close() error - fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) + // RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface. + RequestL2Range(ctx context.Context, start, end uint64) error } type SyncClient struct { *L2Client - FetchUnsafeBlock chan uint64 - done chan struct{} - receivePayload receivePayload - wg sync.WaitGroup -} -var _ SyncClientInterface = (*SyncClient)(nil) + requests chan syncRequest + + resCtx context.Context + resCancel context.CancelFunc + + receivePayload receivePayload + wg sync.WaitGroup +} type SyncClientConfig struct { L2ClientConfig @@ -51,30 +65,58 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m if err != nil { return nil, err } - + // This resource context is shared between all workers that may be started + resCtx, resCancel := context.WithCancel(context.Background()) return &SyncClient{ - L2Client: l2Client, - FetchUnsafeBlock: make(chan uint64, 128), - done: make(chan struct{}), - receivePayload: receiver, + L2Client: l2Client, + resCtx: resCtx, + resCancel: resCancel, + requests: make(chan syncRequest, 128), + receivePayload: receiver, }, nil } -// Start starts up the state loop. -// The loop will have been started if err is not nil. +// Start starts the syncing background work. This may not be called after Close(). func (s *SyncClient) Start() error { + // TODO(CLI-3635): we can start multiple event loop runners as workers, to parallelize the work s.wg.Add(1) go s.eventLoop() return nil } -// Close sends a signal to the event loop to stop. +// Close sends a signal to close all concurrent syncing work. func (s *SyncClient) Close() error { - s.done <- struct{}{} + s.resCancel() s.wg.Wait() return nil } +func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error { + // Drain previous requests now that we have new information + for len(s.requests) > 0 { + <-s.requests + } + + // TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method. + + s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start) + + for i := start; i < end; i += maxRangePerWorker { + r := syncRequest{start: i, end: i + maxRangePerWorker} + if r.end > end { + r.end = end + } + s.log.Info("Scheduling range request", "start", r.start, "end", r.end, "size", r.end-r.start) + // schedule new range to be requested + select { + case s.requests <- r: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + // eventLoop is the main event loop for the sync client. func (s *SyncClient) eventLoop() { defer s.wg.Done() @@ -82,10 +124,28 @@ func (s *SyncClient) eventLoop() { for { select { - case <-s.done: + case <-s.resCtx.Done(): + s.log.Debug("Shutting down RPC sync worker") return - case blockNumber := <-s.FetchUnsafeBlock: - s.fetchUnsafeBlockFromRpc(context.Background(), blockNumber) + case r := <-s.requests: + // Limit the maximum time for fetching payloads + ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10) + // We are only fetching one block at a time here. + err := s.fetchUnsafeBlockFromRpc(ctx, r.start) + cancel() + if err != nil { + s.log.Error("failed syncing L2 block via RPC", "err", err) + } else { + r.start += 1 // continue with next block + } + // Reschedule + if r.start < r.end { + select { + case s.requests <- r: + default: + // drop syncing job if we are too busy with sync jobs already. + } + } } } } @@ -95,28 +155,22 @@ func (s *SyncClient) eventLoop() { // // Post Shanghai hardfork, the engine API's `PayloadBodiesByRange` method will be much more efficient, but for now, // the `eth_getBlockByNumber` method is more widely available. -func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) { +func (s *SyncClient) fetchUnsafeBlockFromRpc(ctx context.Context, blockNumber uint64) error { s.log.Info("Requesting unsafe payload from backup RPC", "block number", blockNumber) payload, err := s.PayloadByNumber(ctx, blockNumber) if err != nil { - s.log.Warn("Failed to convert block to execution payload", "block number", blockNumber, "err", err) - return - } - - // Signature validation is not necessary here since the backup RPC is trusted. - if _, ok := payload.CheckBlockHash(); !ok { - s.log.Warn("Received invalid payload from backup RPC; invalid block hash", "payload", payload.ID()) - return + return fmt.Errorf("failed to fetch payload by number (%d): %w", blockNumber, err) } + // Note: the underlying RPC client used for syncing verifies the execution payload blockhash, if set to untrusted. s.log.Info("Received unsafe payload from backup RPC", "payload", payload.ID()) // Send the retrieved payload to the `unsafeL2Payloads` channel. if err = s.receivePayload(ctx, RpcSyncPeer, payload); err != nil { - s.log.Warn("Failed to send payload into the driver's unsafeL2Payloads channel", "payload", payload.ID(), "err", err) - return + return fmt.Errorf("failed to send payload %s into the driver's unsafeL2Payloads channel: %w", payload.ID(), err) } else { - s.log.Info("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID()) + s.log.Debug("Sent received payload into the driver's unsafeL2Payloads channel", "payload", payload.ID()) + return nil } } From 319279334f630ffb1f75b6376176a642e0a164a8 Mon Sep 17 00:00:00 2001 From: protolambda Date: Tue, 21 Mar 2023 00:40:21 +0100 Subject: [PATCH 2/6] op-node: RPC alt-sync comment review fixes Co-authored-by: Adrian Sutton --- op-node/node/node.go | 1 - op-node/rollup/driver/driver.go | 1 + op-node/rollup/driver/state.go | 4 ++-- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/op-node/node/node.go b/op-node/node/node.go index 8bfb4a68e1a0b..d9b38c0166b1b 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -215,7 +215,6 @@ func (n *OpNode) initRPCSync(ctx context.Context, cfg *Config) error { return nil } - // The sync client's RPC is always trusted config := sources.SyncClientDefaultConfig(&cfg.Rollup, trustRPC) syncClient, err := sources.NewSyncClient(n.OnUnsafeL2Payload, rpcSyncClient, n.log, n.metrics.L2SourceCache, config) diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index f3c927e1a15ef..2881d6aa74e5a 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -84,6 +84,7 @@ type Network interface { type AltSync interface { // RequestL2Range informs the sync source that the given range of L2 blocks is missing, // and should be retrieved from any available alternative syncing source. + // The start of the range is inclusive, the end is exclusive. // The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method. // The latest requested range should always take priority over previous requests. // There may be overlaps in requested ranges. diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 87a1b5a2bc92a..5e0dbceb13803 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -199,8 +199,8 @@ func (s *Driver) eventLoop() { sequencerTimer.Reset(delay) } - // Create a ticker to check if there is a gap in the engine queue, whenever - // If there is, we send requests to sync source to retrieve the missing payloads. + // Create a ticker to check if there is a gap in the engine queue. Whenever + // there is, we send requests to sync source to retrieve the missing payloads. syncCheckInterval := time.Duration(s.config.BlockTime) * time.Second * 2 altSyncTicker := time.NewTicker(syncCheckInterval) defer altSyncTicker.Stop() From fbe13302d0a52692a9e2c20a3392680d9abbce4a Mon Sep 17 00:00:00 2001 From: protolambda Date: Tue, 21 Mar 2023 20:22:02 +0100 Subject: [PATCH 3/6] op-node: schedule by block-number, re-attempt RPC alt-sync requests with backoff --- op-node/sources/sync_client.go | 58 ++++++++++++++++------------------ 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/op-node/sources/sync_client.go b/op-node/sources/sync_client.go index b6e41c05d2d9c..f7513a2ecef42 100644 --- a/op-node/sources/sync_client.go +++ b/op-node/sources/sync_client.go @@ -12,6 +12,8 @@ import ( "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/sources/caching" + "github.com/ethereum-optimism/optimism/op-service/backoff" + "github.com/ethereum/go-ethereum/log" "github.com/libp2p/go-libp2p/core/peer" ) @@ -21,15 +23,10 @@ var ErrNoUnsafeL2PayloadChannel = errors.New("unsafeL2Payloads channel must not // RpcSyncPeer is a mock PeerID for the RPC sync client. var RpcSyncPeer peer.ID = "ALT_RPC_SYNC" -// Limit the maximum range to request at a time. -const maxRangePerWorker = 10 - +// receivePayload queues the received payload for processing. +// This may return an error if there's no capacity for the payload. type receivePayload = func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error -type syncRequest struct { - start, end uint64 -} - type RPCSync interface { io.Closer // Start starts an additional worker syncing job @@ -41,7 +38,7 @@ type RPCSync interface { type SyncClient struct { *L2Client - requests chan syncRequest + requests chan uint64 resCtx context.Context resCancel context.CancelFunc @@ -71,7 +68,7 @@ func NewSyncClient(receiver receivePayload, client client.RPC, log log.Logger, m L2Client: l2Client, resCtx: resCtx, resCancel: resCancel, - requests: make(chan syncRequest, 128), + requests: make(chan uint64, 128), receivePayload: receiver, }, nil } @@ -101,15 +98,9 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start) - for i := start; i < end; i += maxRangePerWorker { - r := syncRequest{start: i, end: i + maxRangePerWorker} - if r.end > end { - r.end = end - } - s.log.Info("Scheduling range request", "start", r.start, "end", r.end, "size", r.end-r.start) - // schedule new range to be requested + for i := start; i < end; i++ { select { - case s.requests <- r: + case s.requests <- i: case <-ctx.Done(): return ctx.Err() } @@ -122,26 +113,33 @@ func (s *SyncClient) eventLoop() { defer s.wg.Done() s.log.Info("Starting sync client event loop") + backoffStrategy := &backoff.ExponentialStrategy{ + Min: 1000, + Max: 20_000, + MaxJitter: 250, + } + for { select { case <-s.resCtx.Done(): s.log.Debug("Shutting down RPC sync worker") return - case r := <-s.requests: - // Limit the maximum time for fetching payloads - ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10) - // We are only fetching one block at a time here. - err := s.fetchUnsafeBlockFromRpc(ctx, r.start) - cancel() - if err != nil { - s.log.Error("failed syncing L2 block via RPC", "err", err) - } else { - r.start += 1 // continue with next block + case reqNum := <-s.requests: + err := backoff.DoCtx(s.resCtx, 5, backoffStrategy, func() error { + // Limit the maximum time for fetching payloads + ctx, cancel := context.WithTimeout(s.resCtx, time.Second*10) + defer cancel() + // We are only fetching one block at a time here. + return s.fetchUnsafeBlockFromRpc(ctx, reqNum) + }) + if err == s.resCtx.Err() { + return } - // Reschedule - if r.start < r.end { + if err != nil { + s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum) + // Reschedule at end of queue select { - case s.requests <- r: + case s.requests <- reqNum: default: // drop syncing job if we are too busy with sync jobs already. } From a7c3e83232a1df9636a76ad9684624b9dfcbc19e Mon Sep 17 00:00:00 2001 From: protolambda Date: Tue, 21 Mar 2023 23:01:42 +0100 Subject: [PATCH 4/6] op-node: fix sync-client ctx error check, and handle concurrent requests read better --- op-node/sources/sync_client.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/op-node/sources/sync_client.go b/op-node/sources/sync_client.go index f7513a2ecef42..8b1c7ae71f10b 100644 --- a/op-node/sources/sync_client.go +++ b/op-node/sources/sync_client.go @@ -91,7 +91,11 @@ func (s *SyncClient) Close() error { func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error { // Drain previous requests now that we have new information for len(s.requests) > 0 { - <-s.requests + select { // in case requests is being read at the same time, don't block on draining it. + case <-s.requests: + default: + break + } } // TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method. @@ -132,10 +136,10 @@ func (s *SyncClient) eventLoop() { // We are only fetching one block at a time here. return s.fetchUnsafeBlockFromRpc(ctx, reqNum) }) - if err == s.resCtx.Err() { - return - } if err != nil { + if err == s.resCtx.Err() { + return + } s.log.Error("failed syncing L2 block via RPC", "err", err, "num", reqNum) // Reschedule at end of queue select { From 7c60017fbf1318a46fcc174f131b305b5a1b7f6d Mon Sep 17 00:00:00 2001 From: protolambda Date: Wed, 22 Mar 2023 16:51:28 +0100 Subject: [PATCH 5/6] op-node: handle duplicates in payloads-queue by hash, not by number --- op-node/rollup/derive/engine_queue.go | 22 ++++++-------- op-node/rollup/derive/payloads_queue.go | 30 ++++++++++++++------ op-node/rollup/derive/payloads_queue_test.go | 6 +--- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index a9d49abc58364..8b63518e8ac99 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -107,7 +107,7 @@ type EngineQueue struct { // The queued-up attributes safeAttributesParent eth.L2BlockRef safeAttributes *eth.PayloadAttributes - unsafePayloads PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps + unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates // Tracks which L2 blocks where last derived from which L1 block. At most finalityLookback large. finalityData []FinalityData @@ -127,18 +127,14 @@ var _ EngineControl = (*EngineQueue)(nil) // NewEngineQueue creates a new EngineQueue, which should be Reset(origin) before use. func NewEngineQueue(log log.Logger, cfg *rollup.Config, engine Engine, metrics Metrics, prev NextAttributesProvider, l1Fetcher L1Fetcher) *EngineQueue { return &EngineQueue{ - log: log, - cfg: cfg, - engine: engine, - metrics: metrics, - finalityData: make([]FinalityData, 0, finalityLookback), - unsafePayloads: PayloadsQueue{ - MaxSize: maxUnsafePayloadsMemory, - SizeFn: payloadMemSize, - blockNos: make(map[uint64]bool), - }, - prev: prev, - l1Fetcher: l1Fetcher, + log: log, + cfg: cfg, + engine: engine, + metrics: metrics, + finalityData: make([]FinalityData, 0, finalityLookback), + unsafePayloads: NewPayloadsQueue(maxUnsafePayloadsMemory, payloadMemSize), + prev: prev, + l1Fetcher: l1Fetcher, } } diff --git a/op-node/rollup/derive/payloads_queue.go b/op-node/rollup/derive/payloads_queue.go index 10f7215562e15..456df3f69ea6f 100644 --- a/op-node/rollup/derive/payloads_queue.go +++ b/op-node/rollup/derive/payloads_queue.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum-optimism/optimism/op-node/eth" ) @@ -48,8 +50,8 @@ func (pq *payloadsByNumber) Pop() any { } const ( - // ~580 bytes per payload, with some margin for overhead - payloadMemFixedCost uint64 = 600 + // ~580 bytes per payload, with some margin for overhead like map data + payloadMemFixedCost uint64 = 800 // 24 bytes per tx overhead (size of slice header in memory) payloadTxMemOverhead uint64 = 24 ) @@ -72,15 +74,25 @@ func payloadMemSize(p *eth.ExecutionPayload) uint64 { // without the need to use heap.Push/heap.Pop as caller. // PayloadsQueue maintains a MaxSize by counting and tracking sizes of added eth.ExecutionPayload entries. // When the size grows too large, the first (lowest block-number) payload is removed from the queue. -// PayloadsQueue allows entries with same block number, or even full duplicates. +// PayloadsQueue allows entries with same block number, but does not allow duplicate blocks type PayloadsQueue struct { pq payloadsByNumber currentSize uint64 MaxSize uint64 - blockNos map[uint64]bool + blockHashes map[common.Hash]struct{} SizeFn func(p *eth.ExecutionPayload) uint64 } +func NewPayloadsQueue(maxSize uint64, sizeFn func(p *eth.ExecutionPayload) uint64) *PayloadsQueue { + return &PayloadsQueue{ + pq: nil, + currentSize: 0, + MaxSize: maxSize, + blockHashes: make(map[common.Hash]struct{}), + SizeFn: sizeFn, + } +} + func (upq *PayloadsQueue) Len() int { return len(upq.pq) } @@ -100,8 +112,8 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { if p == nil { return errors.New("cannot add nil payload") } - if upq.blockNos[p.ID().Number] { - return errors.New("cannot add duplicate payload") + if _, ok := upq.blockHashes[p.BlockHash]; ok { + return fmt.Errorf("cannot add duplicate payload %s", p.ID()) } size := upq.SizeFn(p) if size > upq.MaxSize { @@ -115,7 +127,7 @@ func (upq *PayloadsQueue) Push(p *eth.ExecutionPayload) error { for upq.currentSize > upq.MaxSize { upq.Pop() } - upq.blockNos[p.ID().Number] = true + upq.blockHashes[p.BlockHash] = struct{}{} return nil } @@ -137,7 +149,7 @@ func (upq *PayloadsQueue) Pop() *eth.ExecutionPayload { } ps := heap.Pop(&upq.pq).(payloadAndSize) // nosemgrep upq.currentSize -= ps.size - // remove the key from the blockNos map - delete(upq.blockNos, ps.payload.ID().Number) + // remove the key from the block hashes map + delete(upq.blockHashes, ps.payload.BlockHash) return ps.payload } diff --git a/op-node/rollup/derive/payloads_queue_test.go b/op-node/rollup/derive/payloads_queue_test.go index 0b244b98f5e4f..42d279c4c2fa0 100644 --- a/op-node/rollup/derive/payloads_queue_test.go +++ b/op-node/rollup/derive/payloads_queue_test.go @@ -74,11 +74,7 @@ func TestPayloadMemSize(t *testing.T) { } func TestPayloadsQueue(t *testing.T) { - pq := PayloadsQueue{ - MaxSize: payloadMemFixedCost * 3, - SizeFn: payloadMemSize, - blockNos: make(map[uint64]bool), - } + pq := NewPayloadsQueue(payloadMemFixedCost * 3, payloadMemSize) require.Equal(t, 0, pq.Len()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Pop()) From 163d0720830d146f4aa9effffe2274f806b0eecd Mon Sep 17 00:00:00 2001 From: protolambda Date: Wed, 22 Mar 2023 19:44:37 +0100 Subject: [PATCH 6/6] op-node: fix payloads queue test format and test-duplicates case --- op-node/rollup/derive/payloads_queue_test.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/op-node/rollup/derive/payloads_queue_test.go b/op-node/rollup/derive/payloads_queue_test.go index 42d279c4c2fa0..27c7363fefe19 100644 --- a/op-node/rollup/derive/payloads_queue_test.go +++ b/op-node/rollup/derive/payloads_queue_test.go @@ -4,6 +4,7 @@ import ( "container/heap" "testing" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -74,16 +75,17 @@ func TestPayloadMemSize(t *testing.T) { } func TestPayloadsQueue(t *testing.T) { - pq := NewPayloadsQueue(payloadMemFixedCost * 3, payloadMemSize) + pq := NewPayloadsQueue(payloadMemFixedCost*3, payloadMemSize) require.Equal(t, 0, pq.Len()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Peek()) require.Equal(t, (*eth.ExecutionPayload)(nil), pq.Pop()) - a := ð.ExecutionPayload{BlockNumber: 3} - b := ð.ExecutionPayload{BlockNumber: 4} - c := ð.ExecutionPayload{BlockNumber: 5} - d := ð.ExecutionPayload{BlockNumber: 6} - bAlt := ð.ExecutionPayload{BlockNumber: 4} + a := ð.ExecutionPayload{BlockNumber: 3, BlockHash: common.Hash{3}} + b := ð.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{4}} + c := ð.ExecutionPayload{BlockNumber: 5, BlockHash: common.Hash{5}} + d := ð.ExecutionPayload{BlockNumber: 6, BlockHash: common.Hash{6}} + bAlt := ð.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{0xff}} + bDup := ð.ExecutionPayload{BlockNumber: 4, BlockHash: common.Hash{4}} require.NoError(t, pq.Push(b)) require.Equal(t, pq.Len(), 1) require.Equal(t, pq.Peek(), b) @@ -126,7 +128,9 @@ func TestPayloadsQueue(t *testing.T) { require.Equal(t, pq.Peek(), a) // No duplicates allowed - require.Error(t, pq.Push(bAlt)) + require.Error(t, pq.Push(bDup)) + // But reorg data allowed + require.NoError(t, pq.Push(bAlt)) require.NoError(t, pq.Push(d)) require.Equal(t, pq.Len(), 3)