diff --git a/go.mod b/go.mod index 01cea5e00bab8..94804df9ac87b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d + github.com/hashicorp/golang-lru/v2 v2.0.1 github.com/holiman/uint256 v1.2.0 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-leveldb v0.5.0 @@ -86,7 +87,6 @@ require ( github.com/graph-gophers/graphql-go v1.3.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-bexpr v0.1.11 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/huin/goupnp v1.1.0 // indirect github.com/influxdata/influxdb v1.8.3 // indirect diff --git a/op-e2e/actions/l2_verifier.go b/op-e2e/actions/l2_verifier.go index d81ab588151cc..d1293ecc7c6fc 100644 --- a/op-e2e/actions/l2_verifier.go +++ b/op-e2e/actions/l2_verifier.go @@ -143,6 +143,7 @@ func (s *L2Verifier) SyncStatus() *eth.SyncStatus { UnsafeL2: s.L2Unsafe(), SafeL2: s.L2Safe(), FinalizedL2: s.L2Finalized(), + UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), } } diff --git a/op-e2e/setup.go b/op-e2e/setup.go index 967f707fc93f7..58864c5a57f75 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -206,6 +206,9 @@ type SystemConfig struct { // Any node name not in the topology will not have p2p enabled. P2PTopology map[string][]string + // Enables req-resp sync in the P2P nodes + P2PReqRespSync bool + // If the proposer can make proposals for L2 blocks derived from L1 blocks which are not finalized on L1 yet. NonFinalizedProposals bool @@ -218,6 +221,8 @@ type System struct { RollupConfig *rollup.Config + L2GenesisCfg *core.Genesis + // Connections to running nodes Nodes map[string]*node.Node Backends map[string]*geth_eth.Ethereum @@ -329,6 +334,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { if err != nil { return nil, err } + sys.L2GenesisCfg = l2Genesis for addr, amount := range cfg.Premine { if existing, ok := l2Genesis.Alloc[addr]; ok { l2Genesis.Alloc[addr] = core.GenesisAccount{ @@ -411,30 +417,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { // Configure connections to L1 and L2 for rollup nodes. // TODO: refactor testing to use in-process rpc connections instead of websockets. - l1EndpointConfig := l1Node.WSEndpoint() - useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true" - if useHTTP { - log.Info("using HTTP client") - l1EndpointConfig = l1Node.HTTPEndpoint() - } - for name, rollupCfg := range cfg.Nodes { - l2EndpointConfig := sys.Nodes[name].WSAuthEndpoint() - if useHTTP { - l2EndpointConfig = sys.Nodes[name].HTTPAuthEndpoint() - } - rollupCfg.L1 = &rollupNode.L1EndpointConfig{ - L1NodeAddr: l1EndpointConfig, - L1TrustRPC: false, - L1RPCKind: sources.RPCKindBasic, - RateLimit: 0, - BatchSize: 20, - HttpPollInterval: time.Duration(cfg.DeployConfig.L1BlockTime) * time.Second / 10, - } - rollupCfg.L2 = &rollupNode.L2EndpointConfig{ - L2EngineAddr: l2EndpointConfig, - L2EngineJWTSecret: cfg.JWTSecret, - } + configureL1(rollupCfg, l1Node) + configureL2(rollupCfg, sys.Nodes[name], cfg.JWTSecret) + rollupCfg.L2Sync = &rollupNode.PreparedL2SyncEndpoint{ Client: nil, TrustRPC: false, @@ -486,9 +472,10 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { // TODO we can enable discv5 in the testnodes to test discovery of new peers. // Would need to mock though, and the discv5 implementation does not provide nice mocks here. p := &p2p.Prepared{ - HostP2P: h, - LocalNode: nil, - UDPv5: nil, + HostP2P: h, + LocalNode: nil, + UDPv5: nil, + EnableReqRespSync: cfg.P2PReqRespSync, } p2pNodes[name] = p return p, nil @@ -632,6 +619,35 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { return sys, nil } +func configureL1(rollupNodeCfg *rollupNode.Config, l1Node *node.Node) { + l1EndpointConfig := l1Node.WSEndpoint() + useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true" + if useHTTP { + log.Info("using HTTP client") + l1EndpointConfig = l1Node.HTTPEndpoint() + } + rollupNodeCfg.L1 = &rollupNode.L1EndpointConfig{ + L1NodeAddr: l1EndpointConfig, + L1TrustRPC: false, + L1RPCKind: sources.RPCKindBasic, + RateLimit: 0, + BatchSize: 20, + HttpPollInterval: time.Millisecond * 100, + } +} +func configureL2(rollupNodeCfg *rollupNode.Config, l2Node *node.Node, jwtSecret [32]byte) { + useHTTP := os.Getenv("OP_E2E_USE_HTTP") == "true" + l2EndpointConfig := l2Node.WSAuthEndpoint() + if useHTTP { + l2EndpointConfig = l2Node.HTTPAuthEndpoint() + } + + rollupNodeCfg.L2 = &rollupNode.L2EndpointConfig{ + L2EngineAddr: l2EndpointConfig, + L2EngineJWTSecret: jwtSecret, + } +} + func (cfg SystemConfig) L1ChainIDBig() *big.Int { return new(big.Int).SetUint64(cfg.DeployConfig.L1ChainID) } diff --git a/op-e2e/system_test.go b/op-e2e/system_test.go index 6aa183a6fc9a0..c40411fa03189 100644 --- a/op-e2e/system_test.go +++ b/op-e2e/system_test.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/metrics" rollupNode "github.com/ethereum-optimism/optimism/op-node/node" "github.com/ethereum-optimism/optimism/op-node/p2p" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -35,6 +36,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/sources" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/withdrawals" + oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" ) var enableParallelTesting bool = true @@ -737,6 +739,159 @@ func TestSystemRPCAltSync(t *testing.T) { require.ElementsMatch(t, received, published[:len(received)]) } +func TestSystemP2PAltSync(t *testing.T) { + parallel(t) + if !verboseGethNodes { + log.Root().SetHandler(log.DiscardHandler()) + } + + cfg := DefaultSystemConfig(t) + + // remove default verifier node + delete(cfg.Nodes, "verifier") + // Add more verifier nodes + cfg.Nodes["alice"] = &rollupNode.Config{ + Driver: driver.Config{ + VerifierConfDepth: 0, + SequencerConfDepth: 0, + SequencerEnabled: false, + }, + L1EpochPollInterval: time.Second * 4, + } + cfg.Nodes["bob"] = &rollupNode.Config{ + Driver: driver.Config{ + VerifierConfDepth: 0, + SequencerConfDepth: 0, + SequencerEnabled: false, + }, + L1EpochPollInterval: time.Second * 4, + } + cfg.Loggers["alice"] = testlog.Logger(t, log.LvlInfo).New("role", "alice") + cfg.Loggers["bob"] = testlog.Logger(t, log.LvlInfo).New("role", "bob") + + // connect the nodes + cfg.P2PTopology = map[string][]string{ + "sequencer": {"alice", "bob"}, + "alice": {"sequencer", "bob"}, + "bob": {"alice", "sequencer"}, + } + // Enable the P2P req-resp based sync + cfg.P2PReqRespSync = true + + // Disable batcher, so there will not be any L1 data to sync from + cfg.DisableBatcher = true + + var published []string + seqTracer := new(FnTracer) + // The sequencer still publishes the blocks to the tracer, even if they do not reach the network due to disabled P2P + seqTracer.OnPublishL2PayloadFn = func(ctx context.Context, payload *eth.ExecutionPayload) { + published = append(published, payload.ID().String()) + } + // Blocks are now received via the RPC based alt-sync method + cfg.Nodes["sequencer"].Tracer = seqTracer + + sys, err := cfg.Start() + require.Nil(t, err, "Error starting up system") + defer sys.Close() + + l2Seq := sys.Clients["sequencer"] + + // Transactor Account + ethPrivKey := cfg.Secrets.Alice + + // Submit a TX to L2 sequencer node + toAddr := common.Address{0xff, 0xff} + tx := types.MustSignNewTx(ethPrivKey, types.LatestSignerForChainID(cfg.L2ChainIDBig()), &types.DynamicFeeTx{ + ChainID: cfg.L2ChainIDBig(), + Nonce: 0, + To: &toAddr, + Value: big.NewInt(1_000_000_000), + GasTipCap: big.NewInt(10), + GasFeeCap: big.NewInt(200), + Gas: 21000, + }) + err = l2Seq.SendTransaction(context.Background(), tx) + require.Nil(t, err, "Sending L2 tx to sequencer") + + // Wait for tx to be mined on the L2 sequencer chain + receiptSeq, err := waitForTransaction(tx.Hash(), l2Seq, 6*time.Duration(sys.RollupConfig.BlockTime)*time.Second) + require.Nil(t, err, "Waiting for L2 tx on sequencer") + + // Gossip is able to respond to IWANT messages for the duration of heartbeat_time * message_window = 0.5 * 12 = 6 + // Wait till we pass that, and then we'll have missed some blocks that cannot be retrieved in any way from gossip + time.Sleep(time.Second * 10) + + // set up our syncer node, connect it to alice/bob + cfg.Loggers["syncer"] = testlog.Logger(t, log.LvlInfo).New("role", "syncer") + snapLog := log.New() + snapLog.SetHandler(log.DiscardHandler()) + + // Create a peer, and hook up alice and bob + h, err := sys.Mocknet.GenPeer() + require.NoError(t, err) + _, err = sys.Mocknet.LinkPeers(sys.RollupNodes["alice"].P2P().Host().ID(), h.ID()) + require.NoError(t, err) + _, err = sys.Mocknet.LinkPeers(sys.RollupNodes["bob"].P2P().Host().ID(), h.ID()) + require.NoError(t, err) + + // Configure the new rollup node that'll be syncing + var syncedPayloads []string + syncNodeCfg := &rollupNode.Config{ + L2Sync: &rollupNode.PreparedL2SyncEndpoint{Client: nil}, + Driver: driver.Config{VerifierConfDepth: 0}, + Rollup: *sys.RollupConfig, + P2PSigner: nil, + RPC: rollupNode.RPCConfig{ + ListenAddr: "127.0.0.1", + ListenPort: 0, + EnableAdmin: true, + }, + P2P: &p2p.Prepared{HostP2P: h, EnableReqRespSync: true}, + Metrics: rollupNode.MetricsConfig{Enabled: false}, // no metrics server + Pprof: oppprof.CLIConfig{}, + L1EpochPollInterval: time.Second * 10, + Tracer: &FnTracer{ + OnUnsafeL2PayloadFn: func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) { + syncedPayloads = append(syncedPayloads, payload.ID().String()) + }, + }, + } + configureL1(syncNodeCfg, sys.Nodes["l1"]) + syncerL2Engine, _, err := initL2Geth("syncer", big.NewInt(int64(cfg.DeployConfig.L2ChainID)), sys.L2GenesisCfg, cfg.JWTFilePath) + require.NoError(t, err) + require.NoError(t, syncerL2Engine.Start()) + + configureL2(syncNodeCfg, syncerL2Engine, cfg.JWTSecret) + + syncerNode, err := rollupNode.New(context.Background(), syncNodeCfg, cfg.Loggers["syncer"], snapLog, "", metrics.NewMetrics("")) + require.NoError(t, err) + err = syncerNode.Start(context.Background()) + require.NoError(t, err) + + // connect alice and bob to our new syncer node + _, err = sys.Mocknet.ConnectPeers(sys.RollupNodes["alice"].P2P().Host().ID(), syncerNode.P2P().Host().ID()) + require.NoError(t, err) + _, err = sys.Mocknet.ConnectPeers(sys.RollupNodes["bob"].P2P().Host().ID(), syncerNode.P2P().Host().ID()) + require.NoError(t, err) + + rpc, err := syncerL2Engine.Attach() + require.NoError(t, err) + l2Verif := ethclient.NewClient(rpc) + + // It may take a while to sync, but eventually we should see the sequenced data show up + receiptVerif, err := waitForTransaction(tx.Hash(), l2Verif, 100*time.Duration(sys.RollupConfig.BlockTime)*time.Second) + require.Nil(t, err, "Waiting for L2 tx on verifier") + + require.Equal(t, receiptSeq, receiptVerif) + + // Verify that the tx was received via P2P sync + require.Contains(t, syncedPayloads, eth.BlockID{Hash: receiptVerif.BlockHash, Number: receiptVerif.BlockNumber.Uint64()}.String()) + + // Verify that everything that was received was published + require.GreaterOrEqual(t, len(published), len(syncedPayloads)) + require.ElementsMatch(t, syncedPayloads, published[:len(syncedPayloads)]) +} + // TestSystemDenseTopology sets up a dense p2p topology with 3 verifier nodes and 1 sequencer node. func TestSystemDenseTopology(t *testing.T) { t.Skip("Skipping dense topology test to avoid flakiness. @refcell address in p2p scoring pr.") diff --git a/op-node/eth/sync_status.go b/op-node/eth/sync_status.go index cb180ae2572b5..66b6162df1ad0 100644 --- a/op-node/eth/sync_status.go +++ b/op-node/eth/sync_status.go @@ -32,4 +32,7 @@ type SyncStatus struct { // FinalizedL2 points to the L2 block that was derived fully from // finalized L1 information, thus irreversible. FinalizedL2 L2BlockRef `json:"finalized_l2"` + // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. + // It may be zeroed if there is no targeted block. + UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` } diff --git a/op-node/flags/p2p_flags.go b/op-node/flags/p2p_flags.go index 34138fcdeb502..042d8ecb9cbba 100644 --- a/op-node/flags/p2p_flags.go +++ b/op-node/flags/p2p_flags.go @@ -276,6 +276,12 @@ var ( Hidden: true, EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"), } + SyncReqRespFlag = cli.BoolFlag{ + Name: "p2p.sync.req-resp", + Usage: "Enables experimental P2P req-resp alternative sync method, on both server and client side.", + Required: false, + EnvVar: p2pEnv("SYNC_REQ_RESP"), + } ) // None of these flags are strictly required. @@ -315,4 +321,5 @@ var p2pFlags = []cli.Flag{ GossipMeshDhiFlag, GossipMeshDlazyFlag, GossipFloodPublishFlag, + SyncReqRespFlag, } diff --git a/op-node/metrics/metrics.go b/op-node/metrics/metrics.go index 087bc2cb16d94..2fea08f92ab22 100644 --- a/op-node/metrics/metrics.go +++ b/op-node/metrics/metrics.go @@ -66,6 +66,9 @@ type Metricer interface { Document() []metrics.DocumentedMetric // P2P Metrics SetPeerScores(scores map[string]float64) + ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) + ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) + PayloadsQuarantineSize(n int) } // Metrics tracks all the metrics for the op-node. @@ -90,6 +93,12 @@ type Metrics struct { SequencingErrors *EventMetrics PublishingErrors *EventMetrics + P2PReqDurationSeconds *prometheus.HistogramVec + P2PReqTotal *prometheus.CounterVec + P2PPayloadByNumber *prometheus.GaugeVec + + PayloadsQuarantineTotal prometheus.Gauge + SequencerInconsistentL1Origin *EventMetrics SequencerResets *EventMetrics @@ -322,6 +331,44 @@ func NewMetrics(procName string) *Metrics { "direction", }), + P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: ns, + Subsystem: "p2p", + Name: "req_duration_seconds", + Buckets: []float64{}, + Help: "Duration of P2P requests", + }, []string{ + "p2p_role", // "client" or "server" + "p2p_method", + "result_code", + }), + + P2PReqTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: "p2p", + Name: "req_total", + Help: "Number of P2P requests", + }, []string{ + "p2p_role", // "client" or "server" + "p2p_method", + "result_code", + }), + + P2PPayloadByNumber: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Subsystem: "p2p", + Name: "payload_by_number", + Help: "Payload by number requests", + }, []string{ + "p2p_role", // "client" or "server" + }), + PayloadsQuarantineTotal: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Subsystem: "p2p", + Name: "payloads_quarantine_total", + Help: "number of unverified execution payloads buffered in quarantine", + }), + SequencerBuildingDiffDurationSeconds: factory.NewHistogram(prometheus.HistogramOpts{ Namespace: ns, Name: "sequencer_building_diff_seconds", @@ -567,6 +614,27 @@ func (m *Metrics) Document() []metrics.DocumentedMetric { return m.factory.Document() } +func (m *Metrics) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) { + if resultCode > 4 { // summarize all high codes to reduce metrics overhead + resultCode = 5 + } + code := strconv.FormatUint(uint64(resultCode), 10) + m.P2PReqTotal.WithLabelValues("client", "payload_by_number", code).Inc() + m.P2PReqDurationSeconds.WithLabelValues("client", "payload_by_number", code).Observe(float64(duration) / float64(time.Second)) + m.P2PPayloadByNumber.WithLabelValues("client").Set(float64(num)) +} + +func (m *Metrics) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) { + code := strconv.FormatUint(uint64(resultCode), 10) + m.P2PReqTotal.WithLabelValues("server", "payload_by_number", code).Inc() + m.P2PReqDurationSeconds.WithLabelValues("server", "payload_by_number", code).Observe(float64(duration) / float64(time.Second)) + m.P2PPayloadByNumber.WithLabelValues("server").Set(float64(num)) +} + +func (m *Metrics) PayloadsQuarantineSize(n int) { + m.PayloadsQuarantineTotal.Set(float64(n)) +} + type noopMetricer struct{} var NoopMetrics Metricer = new(noopMetricer) @@ -660,3 +728,12 @@ func (n *noopMetricer) RecordSequencerSealingTime(duration time.Duration) { func (n *noopMetricer) Document() []metrics.DocumentedMetric { return nil } + +func (n *noopMetricer) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) { +} + +func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) { +} + +func (n *noopMetricer) PayloadsQuarantineSize(int) { +} diff --git a/op-node/node/node.go b/op-node/node/node.go index 5de10d74246d2..0dd4b89144e92 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -256,7 +256,7 @@ func (n *OpNode) initMetricsServer(ctx context.Context, cfg *Config) error { func (n *OpNode) initP2P(ctx context.Context, cfg *Config) error { if cfg.P2P != nil { - p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.runCfg, n.metrics) + p2pNode, err := p2p.NewNodeP2P(n.resourcesCtx, &cfg.Rollup, n.log, cfg.P2P, n, n.l2Source, n.runCfg, n.metrics) if err != nil || p2pNode == nil { return err } @@ -373,11 +373,14 @@ func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, payload *e return nil } -func (n *OpNode) RequestL2Range(ctx context.Context, start, end uint64) error { +func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { if n.rpcSync != nil { return n.rpcSync.RequestL2Range(ctx, start, end) } - n.log.Debug("ignoring request to sync L2 range, no sync method available") + if n.p2pNode != nil && n.p2pNode.AltSyncEnabled() { + return n.p2pNode.RequestL2Range(ctx, start, end) + } + n.log.Debug("ignoring request to sync L2 range, no sync method available", "start", start, "end", end) return nil } diff --git a/op-node/node/server_test.go b/op-node/node/server_test.go index ec617e9987066..9f89fc7edd035 100644 --- a/op-node/node/server_test.go +++ b/op-node/node/server_test.go @@ -166,6 +166,7 @@ func randomSyncStatus(rng *rand.Rand) *eth.SyncStatus { UnsafeL2: testutils.RandomL2BlockRef(rng), SafeL2: testutils.RandomL2BlockRef(rng), FinalizedL2: testutils.RandomL2BlockRef(rng), + UnsafeL2SyncTarget: testutils.RandomL2BlockRef(rng), } } diff --git a/op-node/p2p/cli/load_config.go b/op-node/p2p/cli/load_config.go index 96d4dbf2f5dc5..7bfbeb159ec0d 100644 --- a/op-node/p2p/cli/load_config.go +++ b/op-node/p2p/cli/load_config.go @@ -73,6 +73,8 @@ func NewConfig(ctx *cli.Context, blockTime uint64) (*p2p.Config, error) { conf.ConnGater = p2p.DefaultConnGater conf.ConnMngr = p2p.DefaultConnManager + conf.EnableReqRespSync = ctx.GlobalBool(flags.SyncReqRespFlag.Name) + return conf, nil } diff --git a/op-node/p2p/config.go b/op-node/p2p/config.go index da375ad003b93..432ffd2372378 100644 --- a/op-node/p2p/config.go +++ b/op-node/p2p/config.go @@ -40,6 +40,7 @@ type SetupP2P interface { Discovery(log log.Logger, rollupCfg *rollup.Config, tcpPort uint16) (*enode.LocalNode, *discover.UDPv5, error) TargetPeers() uint GossipSetupConfigurables + ReqRespSyncEnabled() bool } // Config sets up a p2p host and discv5 service from configuration. @@ -50,6 +51,9 @@ type Config struct { DisableP2P bool NoDiscovery bool + // Enable P2P-based alt-syncing method (req-resp protocol, not gossip) + AltSync bool + // Pubsub Scoring Parameters PeerScoring pubsub.PeerScoreParams TopicScoring pubsub.TopicScoreParams @@ -104,6 +108,8 @@ type Config struct { ConnGater func(conf *Config) (connmgr.ConnectionGater, error) ConnMngr func(conf *Config) (connmgr.ConnManager, error) + + EnableReqRespSync bool } //go:generate mockery --name ConnectionGater @@ -166,6 +172,10 @@ func (conf *Config) TopicScoringParams() *pubsub.TopicScoreParams { return &conf.TopicScoring } +func (conf *Config) ReqRespSyncEnabled() bool { + return conf.EnableReqRespSync +} + const maxMeshParam = 1000 func (conf *Config) Check() error { diff --git a/op-node/p2p/host_test.go b/op-node/p2p/host_test.go index b135b2f9d738f..6ff3823028af7 100644 --- a/op-node/p2p/host_test.go +++ b/op-node/p2p/host_test.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/testlog" "github.com/ethereum-optimism/optimism/op-node/testutils" @@ -125,7 +126,7 @@ func TestP2PFull(t *testing.T) { runCfgB := &testutils.MockRuntimeConfig{P2PSeqAddress: common.Address{0x42}} logA := testlog.Logger(t, log.LvlError).New("host", "A") - nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, runCfgA, nil) + nodeA, err := NewNodeP2P(context.Background(), &rollup.Config{}, logA, &confA, &mockGossipIn{}, nil, runCfgA, metrics.NoopMetrics) require.NoError(t, err) defer nodeA.Close() @@ -148,7 +149,7 @@ func TestP2PFull(t *testing.T) { logB := testlog.Logger(t, log.LvlError).New("host", "B") - nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, runCfgB, nil) + nodeB, err := NewNodeP2P(context.Background(), &rollup.Config{}, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics) require.NoError(t, err) defer nodeB.Close() hostB := nodeB.Host() @@ -277,7 +278,7 @@ func TestDiscovery(t *testing.T) { resourcesCtx, resourcesCancel := context.WithCancel(context.Background()) defer resourcesCancel() - nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, runCfgA, nil) + nodeA, err := NewNodeP2P(context.Background(), rollupCfg, logA, &confA, &mockGossipIn{}, nil, runCfgA, metrics.NoopMetrics) require.NoError(t, err) defer nodeA.Close() hostA := nodeA.Host() @@ -292,7 +293,7 @@ func TestDiscovery(t *testing.T) { confB.DiscoveryDB = discDBC // Start B - nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, runCfgB, nil) + nodeB, err := NewNodeP2P(context.Background(), rollupCfg, logB, &confB, &mockGossipIn{}, nil, runCfgB, metrics.NoopMetrics) require.NoError(t, err) defer nodeB.Close() hostB := nodeB.Host() @@ -307,7 +308,7 @@ func TestDiscovery(t *testing.T) { }}) // Start C - nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, runCfgC, nil) + nodeC, err := NewNodeP2P(context.Background(), rollupCfg, logC, &confC, &mockGossipIn{}, nil, runCfgC, metrics.NoopMetrics) require.NoError(t, err) defer nodeC.Close() hostC := nodeC.Host() diff --git a/op-node/p2p/node.go b/op-node/p2p/node.go index ef1b92a31ff65..378acf4eb1adb 100644 --- a/op-node/p2p/node.go +++ b/op-node/p2p/node.go @@ -11,8 +11,10 @@ import ( "github.com/libp2p/go-libp2p/core/connmgr" "github.com/libp2p/go-libp2p/core/host" p2pmetrics "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/network" ma "github.com/multiformats/go-multiaddr" + "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum/go-ethereum/log" @@ -32,16 +34,18 @@ type NodeP2P struct { dv5Udp *discover.UDPv5 // p2p discovery service gs *pubsub.PubSub // p2p gossip router gsOut GossipOut // p2p gossip application interface for publishing + syncCl *SyncClient + syncSrv *ReqRespServer } // NewNodeP2P creates a new p2p node, and returns a reference to it. If the p2p is disabled, it returns nil. // If metrics are configured, a bandwidth monitor will be spawned in a goroutine. -func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) { +func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) (*NodeP2P, error) { if setup == nil { return nil, errors.New("p2p node cannot be created without setup") } var n NodeP2P - if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, runCfg, metrics); err != nil { + if err := n.init(resourcesCtx, rollupCfg, log, setup, gossipIn, l2Chain, runCfg, metrics); err != nil { closeErr := n.Close() if closeErr != nil { log.Error("failed to close p2p after starting with err", "closeErr", closeErr, "err", err) @@ -54,7 +58,7 @@ func NewNodeP2P(resourcesCtx context.Context, rollupCfg *rollup.Config, log log. return &n, nil } -func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error { +func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, log log.Logger, setup SetupP2P, gossipIn GossipIn, l2Chain L2Chain, runCfg GossipRuntimeConfig, metrics metrics.Metricer) error { bwc := p2pmetrics.NewBandwidthCounter() var err error @@ -73,6 +77,29 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l n.gater = extra.ConnectionGater() n.connMgr = extra.ConnectionManager() } + // Activate the P2P req-resp sync if enabled by feature-flag. + if setup.ReqRespSyncEnabled() { + n.syncCl = NewSyncClient(log, rollupCfg, n.host.NewStream, gossipIn.OnUnsafeL2Payload, metrics) + n.host.Network().Notify(&network.NotifyBundle{ + ConnectedF: func(nw network.Network, conn network.Conn) { + n.syncCl.AddPeer(conn.RemotePeer()) + }, + DisconnectedF: func(nw network.Network, conn network.Conn) { + n.syncCl.RemovePeer(conn.RemotePeer()) + }, + }) + n.syncCl.Start() + // the host may already be connected to peers, add them all to the sync client + for _, peerID := range n.host.Network().Peers() { + n.syncCl.AddPeer(peerID) + } + if l2Chain != nil { // Only enable serving side of req-resp sync if we have a data-source, to make minimal P2P testing easy + n.syncSrv = NewReqRespServer(rollupCfg, l2Chain, metrics) + // register the sync protocol with libp2p host + payloadByNumber := MakeStreamHandler(resourcesCtx, log.New("serve", "payloads_by_number"), n.syncSrv.HandleSyncRequest) + n.host.SetStreamHandler(PayloadByNumberProtocolID(rollupCfg.L2ChainID), payloadByNumber) + } + } // notify of any new connections/streams/etc. n.host.Network().Notify(NewNetworkNotifier(log, metrics)) // note: the IDDelta functionality was removed from libP2P, and no longer needs to be explicitly disabled. @@ -104,6 +131,17 @@ func (n *NodeP2P) init(resourcesCtx context.Context, rollupCfg *rollup.Config, l return nil } +func (n *NodeP2P) AltSyncEnabled() bool { + return n.syncCl != nil +} + +func (n *NodeP2P) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { + if !n.AltSyncEnabled() { + return fmt.Errorf("cannot request range %s - %s, req-resp sync is not enabled", start, end) + } + return n.syncCl.RequestL2Range(ctx, start, end) +} + func (n *NodeP2P) Host() host.Host { return n.host } @@ -146,6 +184,11 @@ func (n *NodeP2P) Close() error { if err := n.host.Close(); err != nil { result = multierror.Append(result, fmt.Errorf("failed to close p2p host cleanly: %w", err)) } + if n.syncCl != nil { + if err := n.syncCl.Close(); err != nil { + result = multierror.Append(result, fmt.Errorf("failed to close p2p sync client cleanly: %w", err)) + } + } } return result.ErrorOrNil() } diff --git a/op-node/p2p/prepared.go b/op-node/p2p/prepared.go index 00b8bbb0f5b03..328ca1e47dc1d 100644 --- a/op-node/p2p/prepared.go +++ b/op-node/p2p/prepared.go @@ -22,6 +22,8 @@ type Prepared struct { HostP2P host.Host LocalNode *enode.LocalNode UDPv5 *discover.UDPv5 + + EnableReqRespSync bool } var _ SetupP2P = (*Prepared)(nil) @@ -83,3 +85,7 @@ func (p *Prepared) TopicScoringParams() *pubsub.TopicScoreParams { func (p *Prepared) Disabled() bool { return false } + +func (p *Prepared) ReqRespSyncEnabled() bool { + return p.EnableReqRespSync +} diff --git a/op-node/p2p/sync.go b/op-node/p2p/sync.go new file mode 100644 index 0000000000000..f6a4d050b8dae --- /dev/null +++ b/op-node/p2p/sync.go @@ -0,0 +1,750 @@ +package p2p + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/golang/snappy" + "github.com/hashicorp/golang-lru/v2/simplelru" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "golang.org/x/time/rate" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" +) + +// StreamCtxFn provides a new context to use when handling stream requests +type StreamCtxFn func() context.Context + +// Note: the mocknet in testing does not support read/write stream timeouts, the timeouts are only applied if available. +// Rate-limits always apply, and are making sure the request/response throughput is not too fast, instead of too slow. +const ( + // timeout for opening a req-resp stream to another peer. This may involve some protocol negotiation. + streamTimeout = time.Second * 5 + // timeout for writing the request as client. Can be as long as serverReadRequestTimeout + clientWriteRequestTimeout = time.Second * 10 + // timeout for reading a response of a serving peer as client. Can be as long as serverWriteChunkTimeout + clientReadResponsetimeout = time.Second * 10 + // timeout for reading the request content, deny the request if it cannot be fully read in time + serverReadRequestTimeout = time.Second * 10 + // timeout for writing a single response message chunk + // (if a future response consists of multiple chunks, reset the writing timeout per chunk) + serverWriteChunkTimeout = time.Second * 10 + // after the rate-limit reservation hits the max throttle delay, give up on serving a request and just close the stream + maxThrottleDelay = time.Second * 20 + // Do not serve more than 20 requests per second + globalServerBlocksRateLimit rate.Limit = 20 + // Allow up to 5 concurrent requests to be served, eating into our rate-limit + globalServerBlocksBurst = 5 + // Do not serve more than 5 requests per second to the same peer, so we can serve other peers at the same time + peerServerBlocksRateLimit rate.Limit = 5 + // Allow a peer to burst 3 requests, so it does not have to wait + peerServerBlocksBurst = 3 + // If the client hits a request error, it counts as a lot of rate-limit tokens for syncing from that peer: + // we rather sync from other servers. We'll try again later, + // and eventually kick the peer based on degraded scoring if it's really not serving us well. + clientErrRateCost = 100 +) + +func PayloadByNumberProtocolID(l2ChainID *big.Int) protocol.ID { + return protocol.ID(fmt.Sprintf("/opstack/req/payload_by_number/%d/0", l2ChainID)) +} + +type requestHandlerFn func(ctx context.Context, log log.Logger, stream network.Stream) + +func MakeStreamHandler(resourcesCtx context.Context, log log.Logger, fn requestHandlerFn) network.StreamHandler { + return func(stream network.Stream) { + log := log.New("peer", stream.Conn().ID(), "remote", stream.Conn().RemoteMultiaddr()) + defer func() { + if err := recover(); err != nil { + log.Error("p2p server request handling panic", "err", err, "protocol", stream.Protocol()) + } + }() + defer stream.Close() + fn(resourcesCtx, log, stream) + } +} + +type newStreamFn func(ctx context.Context, peerId peer.ID, protocolId ...protocol.ID) (network.Stream, error) + +type receivePayloadFn func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error + +type rangeRequest struct { + start uint64 + end eth.L2BlockRef +} + +type syncResult struct { + payload *eth.ExecutionPayload + peer peer.ID +} + +type peerRequest struct { + num uint64 + + complete *atomic.Bool +} + +type SyncClientMetrics interface { + ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) + PayloadsQuarantineSize(n int) +} + +// SyncClient implements a reverse chain sync with a minimal interface: +// signal the desired range, and receive blocks within this range back. +// Through parent-hash verification, received blocks are all ensured to be part of the canonical chain at one point, +// but it is up to the user to organize and process the results further. +// +// For the sync-client to retrieve any data, peers must be added with AddPeer(id), and removed upon disconnect with RemovePeer(id). +// The client is started with Start(), and may be started before or after changing any peers. +// +// ### Stages +// +// The sync mechanism is implemented as following: +// - User sends range request: blocks on sync main loop (with ctx timeout) +// - Main loop processes range request (from high to low), dividing block requests by number between parallel peers. +// - The high part of the range has a known block-hash, and is marked as trusted. +// - Once there are no more peers available for buffering requests, we stop the range request processing. +// - Every request buffered for a peer is tracked as in-flight, by block number. +// - In-flight requests are not repeated +// - Requests for data that's already in the quarantine are not repeated +// - Data already in the quarantine that is trusted is attempted to be promoted. +// +// - Peers each have their own routine for processing requests. +// - They fetch the requested block by number, parse and validate it, and then send it back to the main loop +// - If peers fail to fetch or process it, or fail to send it back to the main loop within timeout, +// then the doRequest returns an error. It then marks the in-flight request as completed. +// +// - Main loop receives results synchronously with the range requests +// - The result is removed from in-flight tracker +// - The result is added to the quarantine +// - If we trust the hash, we try to promote the result. +// +// ### Concepts +// +// The main concepts are: +// - Quarantine: an LRU that stores the latest fetched block data, by hash as well as an extra index by number. +// +// - Quarantine eviction: upon regular LRU eviction, or explicit removal (when we learn data is not canonical), +// the sync result is removed from quarantine without being forwarded to the receiver. +// The peer that provided the data may be down-scored for providing un-utilized data if the data +// is not trusted during eviction. +// +// - Trusted data: data becomes trusted through 2 ways: +// - The hash / parent-hash of the sync target is marked as trusted. +// - The parent-hash of any promoted data is marked as trusted. +// +// - The trusted-data is maintained in LRU: we only care about the recent accessed blocks. +// +// - Result promotion: content from the quarantine is "promoted" when we find the blockhash is trusted. +// The data is removed from the quarantine, and forwarded to the receiver. +// +// ### Usage +// +// The user is expected to request the range of blocks between its existing chain head, +// and a trusted future block-hash as reference to sync towards. +// Upon receiving results from the sync-client, the user should adjust down its sync-target +// based on the received results, to avoid duplicating work when req-requesting an updated range. +// Range requests should still be repeated eventually however, as the sync client will give up on syncing a large range +// when it's too busy syncing. +// +// The rationale for this approach is that this sync mechanism is primarily intended +// for quickly filling gaps between an existing chain and a gossip chain, and not for very long block ranges. +// Syncing in the execution-layer (through snap-sync) is more appropriate for long ranges. +// If the user does sync a long range of blocks through this mechanism, +// it does end up traversing through the chain, but receives the blocks in reverse order. +// It is up to the user to persist the blocks for later processing, or drop & resync them if persistence is limited. +type SyncClient struct { + log log.Logger + + cfg *rollup.Config + + metrics SyncClientMetrics + + newStreamFn newStreamFn + payloadByNumber protocol.ID + + peersLock sync.Mutex + // syncing worker per peer + peers map[peer.ID]context.CancelFunc + + // trusted blocks are, or have been, canonical at one point. + // Everything that's trusted is acceptable to pass to the sync receiver, + // but we target to just sync the blocks of the latest canonical view of the chain. + trusted *simplelru.LRU[common.Hash, struct{}] + + // quarantine is a LRU of untrusted results: blocks that could not be verified yet + quarantine *simplelru.LRU[common.Hash, syncResult] + // quarantineByNum indexes the quarantine contents by number. + // No duplicates here, only the latest quarantine write is indexed. + // This map is cleared upon evictions of items from the quarantine LRU + quarantineByNum map[uint64]common.Hash + + // inFlight requests are not repeated + inFlight map[uint64]*atomic.Bool + + requests chan rangeRequest + peerRequests chan peerRequest + + results chan syncResult + + resCtx context.Context + resCancel context.CancelFunc + + receivePayload receivePayloadFn + wg sync.WaitGroup +} + +func NewSyncClient(log log.Logger, cfg *rollup.Config, newStream newStreamFn, rcv receivePayloadFn, metrics SyncClientMetrics) *SyncClient { + ctx, cancel := context.WithCancel(context.Background()) + + c := &SyncClient{ + log: log, + cfg: cfg, + metrics: metrics, + newStreamFn: newStream, + payloadByNumber: PayloadByNumberProtocolID(cfg.L2ChainID), + peers: make(map[peer.ID]context.CancelFunc), + quarantineByNum: make(map[uint64]common.Hash), + inFlight: make(map[uint64]*atomic.Bool), + requests: make(chan rangeRequest), // blocking + peerRequests: make(chan peerRequest, 128), + results: make(chan syncResult, 128), + resCtx: ctx, + resCancel: cancel, + receivePayload: rcv, + } + // never errors with positive LRU cache size + // TODO(CLI-3733): if we had an LRU based on on total payloads size, instead of payload count, + // we can safely buffer more data in the happy case. + q, _ := simplelru.NewLRU[common.Hash, syncResult](100, c.onQuarantineEvict) + c.quarantine = q + trusted, _ := simplelru.NewLRU[common.Hash, struct{}](10000, nil) + c.trusted = trusted + return c +} + +func (s *SyncClient) Start() { + s.wg.Add(1) + go s.mainLoop() +} + +func (s *SyncClient) AddPeer(id peer.ID) { + s.peersLock.Lock() + defer s.peersLock.Unlock() + if _, ok := s.peers[id]; ok { + s.log.Warn("cannot register peer for sync duties, peer was already registered", "peer", id) + return + } + s.wg.Add(1) + // add new peer routine + ctx, cancel := context.WithCancel(s.resCtx) + s.peers[id] = cancel + go s.peerLoop(ctx, id) +} + +func (s *SyncClient) RemovePeer(id peer.ID) { + s.peersLock.Lock() + defer s.peersLock.Unlock() + cancel, ok := s.peers[id] + if !ok { + s.log.Warn("cannot remove peer from sync duties, peer was not registered", "peer", id) + return + } + cancel() // once loop exits + delete(s.peers, id) +} + +func (s *SyncClient) Close() error { + s.resCancel() + s.wg.Wait() + return nil +} + +func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { + if end == (eth.L2BlockRef{}) { + s.log.Debug("P2P sync client received range signal, but cannot sync open-ended chain: need sync target to verify blocks through parent-hashes", "start", start) + return nil + } + // synchronize requests with the main loop for state access + select { + case s.requests <- rangeRequest{start: start.Number, end: end}: + return nil + case <-ctx.Done(): + return fmt.Errorf("too busy with P2P results/requests: %w", ctx.Err()) + } +} + +const ( + maxRequestScheduling = time.Second * 3 + maxResultProcessing = time.Second * 3 +) + +func (s *SyncClient) mainLoop() { + defer s.wg.Done() + for { + select { + case req := <-s.requests: + ctx, cancel := context.WithTimeout(s.resCtx, maxRequestScheduling) + s.onRangeRequest(ctx, req) + cancel() + case res := <-s.results: + ctx, cancel := context.WithTimeout(s.resCtx, maxResultProcessing) + s.onResult(ctx, res) + cancel() + case <-s.resCtx.Done(): + s.log.Info("stopped P2P req-resp L2 block sync client") + return + } + } +} + +// onRangeRequest is exclusively called by the main loop, and has thus direct access to the request bookkeeping state. +// This function transforms requested block ranges into work for each peer. +func (s *SyncClient) onRangeRequest(ctx context.Context, req rangeRequest) { + // add req head to trusted set of blocks + s.trusted.Add(req.end.Hash, struct{}{}) + s.trusted.Add(req.end.ParentHash, struct{}{}) + + log := s.log.New("target", req.start, "end", req.end) + + // clean up the completed in-flight requests + for k, v := range s.inFlight { + if v.Load() { + delete(s.inFlight, k) + } + } + + // Now try to fetch lower numbers than current end, to traverse back towards the updated start. + for i := uint64(0); ; i++ { + num := req.end.Number - 1 - i + if num <= req.start { + return + } + // check if we have something in quarantine already + if h, ok := s.quarantineByNum[num]; ok { + if s.trusted.Contains(h) { // if we trust it, try to promote it. + s.tryPromote(h) + } + // Don't fetch things that we have a candidate for already. + // We'll evict it from quarantine by finding a conflict, or if we sync enough other blocks + continue + } + + if _, ok := s.inFlight[num]; ok { + continue // request still in flight + } + pr := peerRequest{num: num, complete: new(atomic.Bool)} + + log.Debug("Scheduling P2P block request", "num", num) + // schedule number + select { + case s.peerRequests <- pr: + s.inFlight[num] = pr.complete + case <-ctx.Done(): + log.Info("did not schedule full P2P sync range", "current", num, "err", ctx.Err()) + return + default: // peers may all be busy processing requests already + log.Info("no peers ready to handle block requests for more P2P requests for L2 block history", "current", num) + return + } + } +} + +func (s *SyncClient) onQuarantineEvict(key common.Hash, value syncResult) { + delete(s.quarantineByNum, uint64(value.payload.BlockNumber)) + s.metrics.PayloadsQuarantineSize(s.quarantine.Len()) + if !s.trusted.Contains(key) { + s.log.Debug("evicting untrusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer) + // TODO(CLI-3732): downscore peer for having provided us a bad block that never turned out to be canonical + } else { + s.log.Debug("evicting trusted payload from quarantine", "id", value.payload.ID(), "peer", value.peer) + } +} + +func (s *SyncClient) tryPromote(h common.Hash) { + parentRes, ok := s.quarantine.Get(h) + if ok { + // Simply reschedule the result, to get it (and possibly its parents) out of quarantine without recursion. + // s.results is buffered, but skip the promotion if the channel is full as it would cause a deadlock. + select { + case s.results <- parentRes: + default: + s.log.Debug("failed to signal block for promotion: sync client is too busy", "h", h) + } + } else { + s.log.Debug("cannot find block in quarantine, nothing to promote", "h", h) + } +} + +func (s *SyncClient) promote(ctx context.Context, res syncResult) { + s.log.Debug("promoting p2p sync result", "payload", res.payload.ID(), "peer", res.peer) + if err := s.receivePayload(ctx, res.peer, res.payload); err != nil { + s.log.Warn("failed to promote payload, receiver error", "err", err) + return + } + s.trusted.Add(res.payload.BlockHash, struct{}{}) + if s.quarantine.Remove(res.payload.BlockHash) { + s.log.Debug("promoted previously p2p-synced block from quarantine to main", "id", res.payload.ID()) + } else { + s.log.Debug("promoted new p2p-synced block to main", "id", res.payload.ID()) + } + + // Mark parent block as trusted, so that we can promote it once we receive it / find it + s.trusted.Add(res.payload.ParentHash, struct{}{}) + + // Try to promote the parent block too, if any: previous unverifiable data may now be canonical + s.tryPromote(res.payload.ParentHash) + + // In case we don't have the parent, and what we have in quarantine is wrong, + // clear what we buffered in favor of fetching something else. + if h, ok := s.quarantineByNum[uint64(res.payload.BlockNumber)-1]; ok { + s.quarantine.Remove(h) + } +} + +// onResult is exclusively called by the main loop, and has thus direct access to the request bookkeeping state. +// This function verifies if the result is canonical, and either promotes the result or moves the result into quarantine. +func (s *SyncClient) onResult(ctx context.Context, res syncResult) { + s.log.Debug("processing p2p sync result", "payload", res.payload.ID(), "peer", res.peer) + // Clean up the in-flight request, we have a result now. + delete(s.inFlight, uint64(res.payload.BlockNumber)) + // Always put it in quarantine first. If promotion fails because the receiver is too busy, this functions as cache. + s.quarantine.Add(res.payload.BlockHash, res) + s.quarantineByNum[uint64(res.payload.BlockNumber)] = res.payload.BlockHash + s.metrics.PayloadsQuarantineSize(s.quarantine.Len()) + // If we know this block is canonical, then promote it + if s.trusted.Contains(res.payload.BlockHash) { + s.promote(ctx, res) + } +} + +// peerLoop for syncing from a single peer +func (s *SyncClient) peerLoop(ctx context.Context, id peer.ID) { + defer func() { + s.peersLock.Lock() + delete(s.peers, id) // clean up + s.wg.Done() + s.peersLock.Unlock() + s.log.Debug("stopped syncing loop of peer", "id", id) + }() + + log := s.log.New("peer", id) + log.Info("Starting P2P sync client event loop") + + var rl rate.Limiter + + // Implement the same rate limits as the server does per-peer, + // so we don't be too aggressive to the server. + rl.SetLimit(peerServerBlocksRateLimit) + rl.SetBurst(peerServerBlocksBurst) + + for { + // wait for peer to be available for more work + if err := rl.WaitN(ctx, 1); err != nil { + return + } + + // once the peer is available, wait for a sync request. + select { + case pr := <-s.peerRequests: + // We already established the peer is available w.r.t. rate-limiting, + // and this is the only loop over this peer, so we can request now. + start := time.Now() + err := s.doRequest(ctx, id, pr.num) + if err != nil { + // mark as complete if there's an error: we are not sending any result and can complete immediately. + pr.complete.Store(true) + log.Warn("failed p2p sync request", "num", pr.num, "err", err) + // If we hit an error, then count it as many requests. + // We'd like to avoid making more requests for a while, to back off. + if err := rl.WaitN(ctx, clientErrRateCost); err != nil { + return + } + } else { + log.Debug("completed p2p sync request", "num", pr.num) + } + took := time.Since(start) + // TODO(CLI-3732): update scores: depending on the speed of the result, + // increase the p2p-sync part of the peer score + // (don't allow the score to grow indefinitely only based on this factor though) + + resultCode := byte(0) + if err != nil { + if re, ok := err.(requestResultErr); ok { + resultCode = re.ResultCode() + } else { + resultCode = 1 + } + } + s.metrics.ClientPayloadByNumberEvent(pr.num, resultCode, took) + case <-ctx.Done(): + return + } + } +} + +type requestResultErr byte + +func (r requestResultErr) Error() string { + return fmt.Sprintf("peer failed to serve request with code %d", uint8(r)) +} + +func (r requestResultErr) ResultCode() byte { + return byte(r) +} + +func (s *SyncClient) doRequest(ctx context.Context, id peer.ID, n uint64) error { + // open stream to peer + reqCtx, reqCancel := context.WithTimeout(ctx, streamTimeout) + str, err := s.newStreamFn(reqCtx, id, s.payloadByNumber) + reqCancel() + if err != nil { + return fmt.Errorf("failed to open stream: %w", err) + } + defer str.Close() + // set write timeout (if available) + _ = str.SetWriteDeadline(time.Now().Add(clientWriteRequestTimeout)) + if err := binary.Write(str, binary.LittleEndian, n); err != nil { + return fmt.Errorf("failed to write request (%d): %w", n, err) + } + if err := str.CloseWrite(); err != nil { + return fmt.Errorf("failed to close writer side while making request: %w", err) + } + + // set read timeout (if available) + _ = str.SetReadDeadline(time.Now().Add(clientReadResponsetimeout)) + + // Limit input, as well as output. + // Compression may otherwise continue to read ignored data for a small output, + // or output more data than desired (zip-bomb) + r := io.LimitReader(str, maxGossipSize) + var result [1]byte + if _, err := io.ReadFull(r, result[:]); err != nil { + return fmt.Errorf("failed to read result part of response: %w", err) + } + if res := result[0]; res != 0 { + return requestResultErr(res) + } + var versionData [4]byte + if _, err := io.ReadFull(r, versionData[:]); err != nil { + return fmt.Errorf("failed to read version part of response: %w", err) + } + version := binary.LittleEndian.Uint32(versionData[:]) + if version != 0 { + return fmt.Errorf("unrecognized ExecutionPayload version: %d", version) + } + // payload is SSZ encoded with Snappy framed compression + r = snappy.NewReader(r) + r = io.LimitReader(r, maxGossipSize) + // We cannot stream straight into the SSZ decoder, since we need the scope of the SSZ payload. + // The server does not prepend it, nor would we trust a claimed length anyway, so we buffer the data we get. + data, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("failed to read response: %w", err) + } + var res eth.ExecutionPayload + if err := res.UnmarshalSSZ(uint32(len(data)), bytes.NewReader(data)); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + if err := str.CloseRead(); err != nil { + return fmt.Errorf("failed to close reading side") + } + if err := verifyBlock(&res, n); err != nil { + return fmt.Errorf("received execution payload is invalid: %w", err) + } + select { + case s.results <- syncResult{payload: &res, peer: id}: + case <-ctx.Done(): + return fmt.Errorf("failed to process response, sync client is too busy: %w", err) + } + return nil +} + +func verifyBlock(payload *eth.ExecutionPayload, expectedNum uint64) error { + // verify L2 block + if expectedNum != uint64(payload.BlockNumber) { + return fmt.Errorf("received execution payload for block %d, but expected block %d", payload.BlockNumber, expectedNum) + } + actual, ok := payload.CheckBlockHash() + if !ok { // payload itself contains bad block hash + return fmt.Errorf("received execution payload for block %d with bad block hash %s, expected %s", expectedNum, payload.BlockHash, actual) + } + return nil +} + +// peerStat maintains rate-limiting data of a peer that requests blocks from us. +type peerStat struct { + // Requests tokenizes each request to sync + Requests *rate.Limiter +} + +type L2Chain interface { + PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) +} + +type ReqRespServerMetrics interface { + ServerPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) +} + +type ReqRespServer struct { + cfg *rollup.Config + + l2 L2Chain + + metrics ReqRespServerMetrics + + peerRateLimits *simplelru.LRU[peer.ID, *peerStat] + peerStatsLock sync.Mutex + + globalRequestsRL *rate.Limiter +} + +func NewReqRespServer(cfg *rollup.Config, l2 L2Chain, metrics ReqRespServerMetrics) *ReqRespServer { + // We should never allow over 1000 different peers to churn through quickly, + // so it's fine to prune rate-limit details past this. + + peerRateLimits, _ := simplelru.NewLRU[peer.ID, *peerStat](1000, nil) + // 3 sync requests per second, with 2 burst + globalRequestsRL := rate.NewLimiter(globalServerBlocksRateLimit, globalServerBlocksBurst) + + return &ReqRespServer{ + cfg: cfg, + l2: l2, + metrics: metrics, + peerRateLimits: peerRateLimits, + globalRequestsRL: globalRequestsRL, + } +} + +// HandleSyncRequest is a stream handler function to register the L2 unsafe payloads alt-sync protocol. +// See MakeStreamHandler to transform this into a LibP2P handler function. +// +// Note that the same peer may open parallel streams. +// +// The caller must Close the stream. +func (srv *ReqRespServer) HandleSyncRequest(ctx context.Context, log log.Logger, stream network.Stream) { + // may stay 0 if we fail to decode the request + start := time.Now() + + // We wait as long as necessary; we throttle the peer instead of disconnecting, + // unless the delay reaches a threshold that is unreasonable to wait for. + ctx, cancel := context.WithTimeout(ctx, maxThrottleDelay) + req, err := srv.handleSyncRequest(ctx, stream) + cancel() + + resultCode := byte(0) + if err != nil { + log.Warn("failed to serve p2p sync request", "req", req, "err", err) + if errors.Is(err, ethereum.NotFound) { + resultCode = 1 + } else if errors.Is(err, invalidRequestErr) { + resultCode = 2 + } else { + resultCode = 3 + } + // try to write error code, so the other peer can understand the reason for failure. + _, _ = stream.Write([]byte{resultCode}) + } else { + log.Debug("successfully served sync response", "req", req) + } + srv.metrics.ServerPayloadByNumberEvent(req, 0, time.Since(start)) +} + +var invalidRequestErr = errors.New("invalid request") + +func (srv *ReqRespServer) handleSyncRequest(ctx context.Context, stream network.Stream) (uint64, error) { + peerId := stream.Conn().RemotePeer() + + // take a token from the global rate-limiter, + // to make sure there's not too much concurrent server work between different peers. + if err := srv.globalRequestsRL.Wait(ctx); err != nil { + return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err) + } + + // find rate limiting data of peer, or add otherwise + srv.peerStatsLock.Lock() + ps, _ := srv.peerRateLimits.Get(peerId) + if ps == nil { + ps = &peerStat{ + Requests: rate.NewLimiter(peerServerBlocksRateLimit, peerServerBlocksBurst), + } + srv.peerRateLimits.Add(peerId, ps) + ps.Requests.Reserve() // count the hit, but make it delay the next request rather than immediately waiting + } else { + // Only wait if it's an existing peer, otherwise the instant rate-limit Wait call always errors. + + // If the requester thinks we're taking too long, then it's their problem and they can disconnect. + // We'll disconnect ourselves only when failing to read/write, + // if the work is invalid (range validation), or when individual sub tasks timeout. + if err := ps.Requests.Wait(ctx); err != nil { + return 0, fmt.Errorf("timed out waiting for global sync rate limit: %w", err) + } + } + srv.peerStatsLock.Unlock() + + // Set read deadline, if available + _ = stream.SetReadDeadline(time.Now().Add(serverReadRequestTimeout)) + + // Read the request + var req uint64 + if err := binary.Read(stream, binary.LittleEndian, &req); err != nil { + return 0, fmt.Errorf("failed to read requested block number: %w", err) + } + if err := stream.CloseRead(); err != nil { + return req, fmt.Errorf("failed to close reading-side of a P2P sync request call: %w", err) + } + + // Check the request is within the expected range of blocks + if req < srv.cfg.Genesis.L2.Number { + return req, fmt.Errorf("cannot serve request for L2 block %d before genesis %d: %w", req, srv.cfg.Genesis.L2.Number, invalidRequestErr) + } + max, err := srv.cfg.TargetBlockNumber(uint64(time.Now().Unix())) + if err != nil { + return req, fmt.Errorf("cannot determine max target block number to verify request: %w", invalidRequestErr) + } + if req > max { + return req, fmt.Errorf("cannot serve request for L2 block %d after max expected block (%v): %w", req, max, invalidRequestErr) + } + + payload, err := srv.l2.PayloadByNumber(ctx, req) + if err != nil { + if errors.Is(err, ethereum.NotFound) { + return req, fmt.Errorf("peer requested unknown block by number: %w", err) + } else { + return req, fmt.Errorf("failed to retrieve payload to serve to peer: %w", err) + } + } + + // We set write deadline, if available, to safely write without blocking on a throttling peer connection + _ = stream.SetWriteDeadline(time.Now().Add(serverWriteChunkTimeout)) + + // 0 - resultCode: success = 0 + // 1:5 - version: 0 + var tmp [5]byte + if _, err := stream.Write(tmp[:]); err != nil { + return req, fmt.Errorf("failed to write response header data: %w", err) + } + w := snappy.NewBufferedWriter(stream) + if _, err := payload.MarshalSSZ(w); err != nil { + return req, fmt.Errorf("failed to write payload to sync response: %w", err) + } + if err := w.Close(); err != nil { + return req, fmt.Errorf("failed to finishing writing payload to sync response: %w", err) + } + return req, nil +} diff --git a/op-node/p2p/sync_test.go b/op-node/p2p/sync_test.go new file mode 100644 index 0000000000000..68fd38c21e387 --- /dev/null +++ b/op-node/p2p/sync_test.go @@ -0,0 +1,254 @@ +package p2p + +import ( + "context" + "math" + "math/big" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/metrics" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/testlog" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" +) + +type mockPayloadFn func(n uint64) (*eth.ExecutionPayload, error) + +func (fn mockPayloadFn) PayloadByNumber(_ context.Context, number uint64) (*eth.ExecutionPayload, error) { + return fn(number) +} + +var _ L2Chain = mockPayloadFn(nil) + +func setupSyncTestData(length uint64) (*rollup.Config, map[uint64]*eth.ExecutionPayload, func(i uint64) eth.L2BlockRef) { + // minimal rollup config to build mock blocks & verify their time. + cfg := &rollup.Config{ + Genesis: rollup.Genesis{ + L1: eth.BlockID{Hash: common.Hash{0xaa}}, + L2: eth.BlockID{Hash: common.Hash{0xbb}}, + L2Time: 9000, + }, + BlockTime: 2, + L2ChainID: big.NewInt(1234), + } + + // create some simple fake test blocks + payloads := make(map[uint64]*eth.ExecutionPayload) + payloads[0] = ð.ExecutionPayload{ + Timestamp: eth.Uint64Quantity(cfg.Genesis.L2Time), + } + payloads[0].BlockHash, _ = payloads[0].CheckBlockHash() + for i := uint64(1); i <= length; i++ { + payload := ð.ExecutionPayload{ + ParentHash: payloads[i-1].BlockHash, + BlockNumber: eth.Uint64Quantity(i), + Timestamp: eth.Uint64Quantity(cfg.Genesis.L2Time + i*cfg.BlockTime), + } + payload.BlockHash, _ = payload.CheckBlockHash() + payloads[i] = payload + } + + l2Ref := func(i uint64) eth.L2BlockRef { + return eth.L2BlockRef{ + Hash: payloads[i].BlockHash, + Number: uint64(payloads[i].BlockNumber), + ParentHash: payloads[i].ParentHash, + Time: uint64(payloads[i].Timestamp), + } + } + return cfg, payloads, l2Ref +} + +func TestSinglePeerSync(t *testing.T) { + t.Parallel() // Takes a while, but can run in parallel + + log := testlog.Logger(t, log.LvlError) + + cfg, payloads, l2Ref := setupSyncTestData(25) + + // Serving payloads: just load them from the map, if they exist + servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) { + p, ok := payloads[n] + if !ok { + return nil, ethereum.NotFound + } + return p, nil + }) + + // collect received payloads in a buffered channel, so we can verify we get everything + received := make(chan *eth.ExecutionPayload, 100) + receivePayload := receivePayloadFn(func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error { + received <- payload + return nil + }) + + // Setup 2 minimal test hosts to attach the sync protocol to + mnet, err := mocknet.FullMeshConnected(2) + require.NoError(t, err, "failed to setup mocknet") + defer mnet.Close() + hosts := mnet.Hosts() + hostA, hostB := hosts[0], hosts[1] + require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Setup host A as the server + srv := NewReqRespServer(cfg, servePayload, metrics.NoopMetrics) + payloadByNumber := MakeStreamHandler(ctx, log.New("role", "server"), srv.HandleSyncRequest) + hostA.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber) + + // Setup host B as the client + cl := NewSyncClient(log.New("role", "client"), cfg, hostB.NewStream, receivePayload, metrics.NoopMetrics) + + // Setup host B (client) to sync from its peer Host A (server) + cl.AddPeer(hostA.ID()) + cl.Start() + defer cl.Close() + + // request to start syncing between 10 and 20 + require.NoError(t, cl.RequestL2Range(ctx, l2Ref(10), l2Ref(20))) + + // and wait for the sync results to come in (in reverse order) + receiveCtx, receiveCancel := context.WithTimeout(ctx, time.Second*5) + defer receiveCancel() + for i := uint64(19); i > 10; i-- { + select { + case p := <-received: + require.Equal(t, uint64(p.BlockNumber), i, "expecting payloads in order") + exp, ok := payloads[uint64(p.BlockNumber)] + require.True(t, ok, "expecting known payload") + require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload") + case <-receiveCtx.Done(): + t.Fatal("did not receive all expected payloads within expected time") + } + } +} + +func TestMultiPeerSync(t *testing.T) { + t.Parallel() // Takes a while, but can run in parallel + + log := testlog.Logger(t, log.LvlError) + + cfg, payloads, l2Ref := setupSyncTestData(100) + + setupPeer := func(ctx context.Context, h host.Host) (*SyncClient, chan *eth.ExecutionPayload) { + // Serving payloads: just load them from the map, if they exist + servePayload := mockPayloadFn(func(n uint64) (*eth.ExecutionPayload, error) { + p, ok := payloads[n] + if !ok { + return nil, ethereum.NotFound + } + return p, nil + }) + + // collect received payloads in a buffered channel, so we can verify we get everything + received := make(chan *eth.ExecutionPayload, 100) + receivePayload := receivePayloadFn(func(ctx context.Context, from peer.ID, payload *eth.ExecutionPayload) error { + received <- payload + return nil + }) + + // Setup as server + srv := NewReqRespServer(cfg, servePayload, metrics.NoopMetrics) + payloadByNumber := MakeStreamHandler(ctx, log.New("serve", "payloads_by_number"), srv.HandleSyncRequest) + h.SetStreamHandler(PayloadByNumberProtocolID(cfg.L2ChainID), payloadByNumber) + + cl := NewSyncClient(log.New("role", "client"), cfg, h.NewStream, receivePayload, metrics.NoopMetrics) + return cl, received + } + + // Setup 3 minimal test hosts to attach the sync protocol to + mnet, err := mocknet.FullMeshConnected(3) + require.NoError(t, err, "failed to setup mocknet") + defer mnet.Close() + hosts := mnet.Hosts() + hostA, hostB, hostC := hosts[0], hosts[1], hosts[2] + require.Equal(t, hostA.Network().Connectedness(hostB.ID()), network.Connected) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clA, recvA := setupPeer(ctx, hostA) + clB, recvB := setupPeer(ctx, hostB) + clC, _ := setupPeer(ctx, hostC) + + // Make them all sync from each other + clA.AddPeer(hostB.ID()) + clA.AddPeer(hostC.ID()) + clA.Start() + defer clA.Close() + clB.AddPeer(hostA.ID()) + clB.AddPeer(hostC.ID()) + clB.Start() + defer clB.Close() + clC.AddPeer(hostA.ID()) + clC.AddPeer(hostB.ID()) + clC.Start() + defer clC.Close() + + // request to start syncing between 10 and 100 + require.NoError(t, clA.RequestL2Range(ctx, l2Ref(10), l2Ref(90))) + + // With such large range to request we are going to hit the rate-limits of B and C, + // but that means we'll balance the work between the peers. + + // wait for the results to come in, based on the expected rate limit, divided by 2 (because we have 2 servers), with a buffer of 2 seconds + receiveCtx, receiveCancel := context.WithTimeout(ctx, time.Second*time.Duration(math.Ceil(float64((89-10)/peerServerBlocksRateLimit)))/2+time.Second*2) + defer receiveCancel() + for i := uint64(89); i > 10; i-- { + select { + case p := <-recvA: + exp, ok := payloads[uint64(p.BlockNumber)] + require.True(t, ok, "expecting known payload") + require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload") + case <-receiveCtx.Done(): + t.Fatal("did not receive all expected payloads within expected time") + } + } + + // now see if B can sync a range, and fill the gap with a re-request + bl25 := payloads[25] // temporarily remove it from the available payloads. This will create a gap + delete(payloads, uint64(25)) + require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(30))) + for i := uint64(29); i > 25; i-- { + select { + case p := <-recvB: + exp, ok := payloads[uint64(p.BlockNumber)] + require.True(t, ok, "expecting known payload") + require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload") + case <-receiveCtx.Done(): + t.Fatal("did not receive all expected payloads within expected time") + } + } + // the request for 25 should fail. See: + // server: WARN peer requested unknown block by number num=25 + // client: WARN failed p2p sync request num=25 err="peer failed to serve request with code 1" + require.Zero(t, len(recvB), "there is a gap, should not see other payloads yet") + // Add back the block + payloads[25] = bl25 + // And request a range again, 25 is there now, and 21-24 should follow quickly (some may already have been fetched and wait in quarantine) + require.NoError(t, clB.RequestL2Range(ctx, l2Ref(20), l2Ref(26))) + receiveCtx, receiveCancel = context.WithTimeout(ctx, time.Second*10) + defer receiveCancel() + for i := uint64(25); i > 20; i-- { + select { + case p := <-recvB: + exp, ok := payloads[uint64(p.BlockNumber)] + require.True(t, ok, "expecting known payload") + require.Equal(t, exp.BlockHash, p.BlockHash, "expecting the correct payload") + case <-receiveCtx.Done(): + t.Fatal("did not receive all expected payloads within expected time") + } + } +} diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 283476db9bbbc..e5b9063e3d5aa 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -678,22 +678,15 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System return io.EOF } -// GetUnsafeQueueGap retrieves the current [start, end) range (incl. start, excl. end) -// of the gap between the tip of the unsafe priority queue and the unsafe head. -// If there is no gap, the difference between end and start will be 0. -func (eq *EngineQueue) GetUnsafeQueueGap(expectedNumber uint64) (start uint64, end uint64) { - // The start of the gap is always the unsafe head + 1 - start = eq.unsafeHead.Number + 1 - - // If the priority queue is empty, the end is the first block number at the top of the priority queue - // Otherwise, the end is the expected block number +// UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none. +func (eq *EngineQueue) UnsafeL2SyncTarget() eth.L2BlockRef { if first := eq.unsafePayloads.Peek(); first != nil { - // Don't include the payload we already have in the sync range - end = first.ID().Number + ref, err := PayloadToBlockRef(first, &eq.cfg.Genesis) + if err != nil { + return eth.L2BlockRef{} + } + return ref } else { - // Include the expected payload in the sync range - end = expectedNumber + 1 + return eth.L2BlockRef{} } - - return start, end } diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 5d53a1721fa79..4d68fa39177e2 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -51,7 +51,7 @@ type EngineQueueStage interface { Finalize(l1Origin eth.L1BlockRef) AddUnsafePayload(payload *eth.ExecutionPayload) - GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) + UnsafeL2SyncTarget() eth.L2BlockRef Step(context.Context) error } @@ -167,10 +167,9 @@ func (dp *DerivationPipeline) AddUnsafePayload(payload *eth.ExecutionPayload) { dp.eng.AddUnsafePayload(payload) } -// GetUnsafeQueueGap retrieves the current [start, end] range of the gap between the tip of the unsafe priority queue and the unsafe head. -// If there is no gap, the start and end will be 0. -func (dp *DerivationPipeline) GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) { - return dp.eng.GetUnsafeQueueGap(expectedNumber) +// UnsafeL2SyncTarget retrieves the first queued-up L2 unsafe payload, or a zeroed reference if there is none. +func (dp *DerivationPipeline) UnsafeL2SyncTarget() eth.L2BlockRef { + return dp.eng.UnsafeL2SyncTarget() } // Step tries to progress the buffer. diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 2881d6aa74e5a..b011e99abb33a 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -48,7 +48,7 @@ type DerivationPipeline interface { Reset() Step(ctx context.Context) error AddUnsafePayload(payload *eth.ExecutionPayload) - GetUnsafeQueueGap(expectedNumber uint64) (uint64, uint64) + UnsafeL2SyncTarget() eth.L2BlockRef Finalize(ref eth.L1BlockRef) FinalizedL1() eth.L1BlockRef Finalized() eth.L2BlockRef @@ -84,12 +84,20 @@ type Network interface { type AltSync interface { // RequestL2Range informs the sync source that the given range of L2 blocks is missing, // and should be retrieved from any available alternative syncing source. - // The start of the range is inclusive, the end is exclusive. + // The start and end of the range are exclusive: + // the start is the head we already have, the end is the first thing we have queued up. + // It's the task of the alt-sync mechanism to use this hint to fetch the right payloads. + // Note that the end and start may not be consistent: in this case the sync method should fetch older history + // + // If the end value is zeroed, then the sync-method may determine the end free of choice, + // e.g. sync till the chain head meets the wallclock time. This functionality is optional: + // a fixed target to sync towards may be determined by picking up payloads through P2P gossip or other sources. + // // The sync results should be returned back to the driver via the OnUnsafeL2Payload(ctx, payload) method. // The latest requested range should always take priority over previous requests. // There may be overlaps in requested ranges. // An error may be returned if the scheduling fails immediately, e.g. a context timeout. - RequestL2Range(ctx context.Context, start, end uint64) error + RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error } // NewDriver composes an events handler that tracks L1 state, triggers L2 derivation, and optionally sequences new L2 blocks. diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 6b9351675e0c9..79fd55f2af9a2 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -422,6 +422,7 @@ func (s *Driver) syncStatus() *eth.SyncStatus { UnsafeL2: s.derivation.UnsafeL2Head(), SafeL2: s.derivation.SafeL2Head(), FinalizedL2: s.derivation.Finalized(), + UnsafeL2SyncTarget: s.derivation.UnsafeL2SyncTarget(), } } @@ -489,24 +490,14 @@ type hashAndErrorChannel struct { // WARNING: This is only an outgoing signal, the blocks are not guaranteed to be retrieved. // Results are received through OnUnsafeL2Payload. func (s *Driver) checkForGapInUnsafeQueue(ctx context.Context) error { - // subtract genesis time from wall clock to get the time elapsed since genesis, and then divide that - // difference by the block time to get the expected L2 block number at the current time. If the - // unsafe head does not have this block number, then there is a gap in the queue. - wallClock := uint64(time.Now().Unix()) - genesisTimestamp := s.config.Genesis.L2Time - if wallClock < genesisTimestamp { - s.log.Debug("nothing to sync, did not reach genesis L2 time yet", "genesis", genesisTimestamp) - return nil - } - wallClockGenesisDiff := wallClock - genesisTimestamp - // Note: round down, we should not request blocks into the future. - blocksSinceGenesis := wallClockGenesisDiff / s.config.BlockTime - expectedL2Block := s.config.Genesis.L2.Number + blocksSinceGenesis - - start, end := s.derivation.GetUnsafeQueueGap(expectedL2Block) - // Check if there is a gap between the unsafe head and the expected L2 block number at the current time. - if end > start { - s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end-start) + start := s.derivation.UnsafeL2Head() + end := s.derivation.UnsafeL2SyncTarget() + // Check if we have missing blocks between the start and end. Request them if we do. + if end == (eth.L2BlockRef{}) { + s.log.Debug("requesting sync with open-end range", "start", start) + return s.altSync.RequestL2Range(ctx, start, eth.L2BlockRef{}) + } else if end.Number > start.Number+1 { + s.log.Debug("requesting missing unsafe L2 block range", "start", start, "end", end, "size", end.Number-start.Number) return s.altSync.RequestL2Range(ctx, start, end) } return nil diff --git a/op-node/rollup/types.go b/op-node/rollup/types.go index 59317ec66d2d9..e5d4c50a3955f 100644 --- a/op-node/rollup/types.go +++ b/op-node/rollup/types.go @@ -116,6 +116,20 @@ func (cfg *Config) ValidateL2Config(ctx context.Context, client L2Client) error return nil } +func (cfg *Config) TargetBlockNumber(timestamp uint64) (num uint64, err error) { + // subtract genesis time from timestamp to get the time elapsed since genesis, and then divide that + // difference by the block time to get the expected L2 block number at the current time. If the + // unsafe head does not have this block number, then there is a gap in the queue. + genesisTimestamp := cfg.Genesis.L2Time + if timestamp < genesisTimestamp { + return 0, fmt.Errorf("did not reach genesis time (%d) yet", genesisTimestamp) + } + wallClockGenesisDiff := timestamp - genesisTimestamp + // Note: round down, we should not request blocks into the future. + blocksSinceGenesis := wallClockGenesisDiff / cfg.BlockTime + return cfg.Genesis.L2.Number + blocksSinceGenesis, nil +} + type L1Client interface { ChainID(context.Context) (*big.Int, error) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) diff --git a/op-node/sources/sync_client.go b/op-node/sources/sync_client.go index 8b1c7ae71f10b..3c983e86e28e9 100644 --- a/op-node/sources/sync_client.go +++ b/op-node/sources/sync_client.go @@ -32,9 +32,10 @@ type RPCSync interface { // Start starts an additional worker syncing job Start() error // RequestL2Range signals that the given range should be fetched, implementing the alt-sync interface. - RequestL2Range(ctx context.Context, start, end uint64) error + RequestL2Range(ctx context.Context, start uint64, end eth.L2BlockRef) error } +// SyncClient implements the driver AltSync interface, including support for fetching an open-ended chain of L2 blocks. type SyncClient struct { *L2Client @@ -88,7 +89,7 @@ func (s *SyncClient) Close() error { return nil } -func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) error { +func (s *SyncClient) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { // Drain previous requests now that we have new information for len(s.requests) > 0 { select { // in case requests is being read at the same time, don't block on draining it. @@ -98,11 +99,23 @@ func (s *SyncClient) RequestL2Range(ctx context.Context, start, end uint64) erro } } + endNum := end.Number + if end == (eth.L2BlockRef{}) { + n, err := s.rollupCfg.TargetBlockNumber(uint64(time.Now().Unix())) + if err != nil { + return err + } + if n <= start.Number { + return nil + } + endNum = n + } + // TODO(CLI-3635): optimize the by-range fetching with the Engine API payloads-by-range method. - s.log.Info("Scheduling to fetch missing payloads from backup RPC", "start", start, "end", end, "size", end-start) + s.log.Info("Scheduling to fetch trailing missing payloads from backup RPC", "start", start, "end", endNum, "size", endNum-start.Number-1) - for i := start; i < end; i++ { + for i := start.Number + 1; i < endNum; i++ { select { case s.requests <- i: case <-ctx.Done(): diff --git a/specs/rollup-node-p2p.md b/specs/rollup-node-p2p.md index c5a8c005e5aed..58d1483947343 100644 --- a/specs/rollup-node-p2p.md +++ b/specs/rollup-node-p2p.md @@ -57,6 +57,8 @@ and are adopted by several other blockchains, most notably the [L1 consensus lay - [Block validation](#block-validation) - [Block processing](#block-processing) - [Block topic scoring parameters](#block-topic-scoring-parameters) +- [Req-Resp](#req-resp) + - [`payload_by_number`](#payload_by_number) @@ -305,12 +307,97 @@ A node may apply the block to their local engine ahead of L1 availability, if it TODO: GossipSub per-topic scoring to fine-tune incentives for ideal propagation delay and bandwidth usage. +## Req-Resp + +The op-node implements a similar request-response encoding for its sync protocols as the L1 ethereum Beacon-Chain. +See [L1 P2P-interface req-resp specification][eth2-p2p-reqresp] and [Altair P2P update][eth2-p2p-altair-reqresp]. + +However, the protocol is simplified, to avoid several issues seen in L1: + +- Error strings in responses, if there is any alternative response, + should not need to be compressed or have an artificial global length limit. +- Payload lengths should be fixed-length: byte-by-byte uvarint reading from the underlying stream is undesired. +- `` are relaxed to encode a `uint32`, rather than a beacon-chain `ForkDigest`. +- Payload-encoding may change per hardfork, so is not part of the protocol-ID. +- Usage of response-chunks is specific to the req-resp method: most basic req-resp does not need chunked responses. +- Compression is encouraged to be part of the payload-encoding, specific to the req-resp method, where necessary: + pings and such do not need streaming frame compression etc. + +And the protocol ID format follows the same scheme as L1, +except the trailing encoding schema part, which is now message-specific: + +```text +/ProtocolPrefix/MessageName/SchemaVersion/ +``` + +The req-resp protocols served by the op-node all have `/ProtocolPrefix` set to `/opstack/req`. + +Individual methods may include the chain ID as part of the `/MessageName` segment, +so it's immediately clear which chain the method applies to, if the communication is chain-specific. +Other methods may include chain-information in the request and/or response data, +such as the `ForkDigest` `` in L1 beacon chain req-resp protocols. + +Each segment starts with a `/`, and may contain multiple `/`, and the final protocol ID is suffixed with a `/`. + +### `payload_by_number` + +This is an optional chain syncing method, to request/serve execution payloads by number. +This serves as a method to fill gaps upon missed gossip, and sync short to medium ranges of unsafe L2 blocks. + +Protocol ID: `/opstack/req/payload_by_number//0/` + +- `/MessageName` is `/block_by_number/` where `` is set to the op-node L2 chain ID. +- `/SchemaVersion` is `/0` + +Request format: ``: a little-endian `uint64` - the block number to request. + +Response format: ` = ` + +- `` is a byte code describing the result. + - `0` on success, `` should follow. + - `1` if valid request, but unavailable payload. + - `2` if invalid request + - `3+` if other error + - The `>= 128` range is reserved for future use. +- `` is a little-endian `uint32`, identifying the type of `ExecutionPayload` (fork-specific) +- `` is an encoded block, read till stream EOF. + +The input of `` should be limited, as well as any generated decompressed output, +to avoid unexpected resource usage or zip-bomb type attacks. +A 10 MB limit is recommended, to ensure all blocks may be synced. +Implementations may opt for a different limit, since this sync method is optional. + +`` list: + +- `0`: SSZ-encoded `ExecutionPayload`, with Snappy framing compression, + matching the `ExecutionPayload` SSZ definition of the L1 Merge, L2 Bedrock and L2 Regolith versions. +- Other versions may be listed here with future network upgrades, such as the L1 Shanghai upgrade. + +The request is by block-number, enabling parallel fetching of a chain across many peers. + +A `res = 0` response should be verified to: + +- Have a block-number matching the requested block number. +- Have a consistent `blockhash` w.r.t. the other block contents. +- Build towards a known canonical block. + - This can be verified by checking if the parent-hash of a previous trusted canonical block matches + that of the verified hash of the retrieved block. + - For unsafe blocks this may be relaxed to verification against the parent-hash of any previously trusted block: + - The gossip validation process limits the amount of blocks that may be trusted to sync towards. + - The unsafe blocks should be queued for processing, the latest received L2 unsafe blocks should always + override any previous chain, until the final L2 chain can be reproduced from L1 data. + +A `res > 0` response code should not be accepted. The result code is helpful for debugging, +but the client should regard any error like any any other unanswered request, as the responding peer cannot be trusted. + ---- [libp2p]: https://libp2p.io/ [discv5]: https://github.com/ethereum/devp2p/blob/master/discv5/discv5.md [discv5-random-nodes]: https://pkg.go.dev/github.com/ethereum/go-ethereum@v1.10.12/p2p/discover#UDPv5.RandomNodes [eth2-p2p]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md +[eth2-p2p-reqresp]: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#the-reqresp-domain +[eth2-p2p-altair-reqresp]: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#the-reqresp-domain [libp2p-noise]: https://github.com/libp2p/specs/tree/master/noise [multistream-select]: https://github.com/multiformats/multistream-select/ [mplex]: https://github.com/libp2p/specs/tree/master/mplex