diff --git a/op-chain-ops/interopgen/deploy.go b/op-chain-ops/interopgen/deploy.go index 9937c5dbf4fa1..bf12da1962dbe 100644 --- a/op-chain-ops/interopgen/deploy.go +++ b/op-chain-ops/interopgen/deploy.go @@ -137,7 +137,7 @@ func CreateL1(logger log.Logger, fa *foundry.ArtifactsFS, srcFS *foundry.SourceM PrevRandao: cfg.L1GenesisBlockMixHash, BlobHashes: nil, } - l1Host := script.NewHost(logger.New("role", "l1", "chain", cfg.ChainID), fa, srcFS, l1Context, script.WithCreate2Deployer()) + l1Host := script.NewHost(logger.New("role", "l1", "chain", cfg.ChainID), fa, srcFS, l1Context, script.WithCreate2Deployer(), script.WithNoMaxCodeSize()) return l1Host } diff --git a/op-e2e/actions/helpers/l2_verifier.go b/op-e2e/actions/helpers/l2_verifier.go index c99e95e288b8a..00ec43d9c4e9a 100644 --- a/op-e2e/actions/helpers/l2_verifier.go +++ b/op-e2e/actions/helpers/l2_verifier.go @@ -136,7 +136,7 @@ func NewL2Verifier(t Testing, log log.Logger, l1 derive.L1Fetcher, var interopSys interop.SubSystem if cfg.InteropTime != nil { - mm := indexing.NewIndexingMode(log, cfg, "127.0.0.1", 0, interopJWTSecret, l1, eng, &opmetrics.NoopRPCMetrics{}) + mm := indexing.NewIndexingMode(log, cfg, "127.0.0.1", 0, interopJWTSecret, l1, eng, &opmetrics.NoopRPCMetrics{}, 5_000) mm.TestDisableEventDeduplication() interopSys = mm sys.Register("interop", interopSys, opts) diff --git a/op-e2e/actions/interop/dsl/dsl.go b/op-e2e/actions/interop/dsl/dsl.go index a5c132772f646..645c64efa428a 100644 --- a/op-e2e/actions/interop/dsl/dsl.go +++ b/op-e2e/actions/interop/dsl/dsl.go @@ -399,3 +399,34 @@ func (d *InteropDSL) ActSyncSupernode(t helpers.Testing, opts ...actSyncSupernod chain.Sequencer.ActL2PipelineFull(t) // node to complete syncing to L1 head. } } + +// SubmitBatchesAndAdvanceL1 submits the latest batches for all chains and synchronizes supervisor the latest data on L1 +func (d *InteropDSL) SubmitBatchesAndAdvanceL1(t helpers.Testing, l1BlockTimeSeconds uint64) { + d.Actors.ChainA.Batcher.ActSubmitAll(t) + d.Actors.ChainB.Batcher.ActSubmitAll(t) + + d.AdvanceL1( + func(o *AdvanceL1Opts) { + o.L1BlockTimeSeconds = l1BlockTimeSeconds + }, + WithActIncludeTx(d.Actors.L1Miner.ActL1IncludeTx(d.Actors.ChainA.BatcherAddr)), + WithActIncludeTx(d.Actors.L1Miner.ActL1IncludeTx(d.Actors.ChainB.BatcherAddr)), + ) + + for _, chain := range []*Chain{d.Actors.ChainA, d.Actors.ChainB} { + status := chain.Sequencer.SyncStatus() + require.Equalf(t, status.UnsafeL2, status.LocalSafeL2, "Chain %v did not fully advance local safe head", chain.ChainID) + chain.Sequencer.SyncSupervisor(t) + } + d.Actors.Supervisor.ProcessFull(t) +} + +// SubmitBatches checks if there are any pending batches to be submitted for either chain, +// and if so, submits them and advances L1 to process them. +func (d *InteropDSL) SubmitBatches(t helpers.Testing, l1BlockTimeSeconds uint64) { + statusA := d.Actors.ChainA.Sequencer.SyncStatus() + statusB := d.Actors.ChainB.Sequencer.SyncStatus() + if statusA.UnsafeL2.Number != statusA.LocalSafeL2.Number || statusB.UnsafeL2.Number != statusB.LocalSafeL2.Number { + d.SubmitBatchesAndAdvanceL1(t, l1BlockTimeSeconds) + } +} diff --git a/op-e2e/actions/interop/proofs_test.go b/op-e2e/actions/interop/proofs_test.go index 3913903a9657f..4553fcdbd9b6d 100644 --- a/op-e2e/actions/interop/proofs_test.go +++ b/op-e2e/actions/interop/proofs_test.go @@ -1398,6 +1398,109 @@ func TestInteropFaultProofs_DepositMessage_InvalidExecution(gt *testing.T) { runFppAndChallengerTests(gt, system, tests) } +func TestInteropFaultProofs_DeepCanonicalBlockQuery(gt *testing.T) { + const totalBlocks = 10_000 + const blocksPerBatch = 200 + const expiryTime = totalBlocks * 4 // to ensure xchain messages don't expire + + t := helpers.NewDefaultTesting(gt) + system := dsl.NewInteropDSL(t, dsl.SetMessageExpiryTime(expiryTime)) + actors := system.Actors + emitter := system.DeployEmitterContracts() + + // Keep L1 time aligned with L2 time, otherwise MaxSequencerDrift will stall block building. + // We assume both L2s share the same block time in this test. + require.Equal(t, actors.ChainA.RollupCfg.BlockTime, actors.ChainB.RollupCfg.BlockTime) + l1BlockTimeSeconds := uint64(blocksPerBatch) * actors.ChainA.RollupCfg.BlockTime + require.NotZero(t, l1BlockTimeSeconds) + + var msg *dsl.Message + var emitBlockNum uint64 + for i := range totalBlocks { + if i == 10 { + actors.ChainA.Sequencer.ActL2StartBlock(t) + msg = dsl.NewMessage(system, actors.ChainA, emitter, "early message").Emit() + actors.ChainA.Sequencer.ActL2EndBlock(t) + msg.CheckEmitted() + emitBlockNum = actors.ChainA.Sequencer.L2Unsafe().Number + } else { + actors.ChainA.Sequencer.ActL2EmptyBlock(t) + } + actors.ChainB.Sequencer.ActL2EmptyBlock(t) + + if (i+1)%blocksPerBatch == 0 { + system.SubmitBatchesAndAdvanceL1(t, l1BlockTimeSeconds) + } + } + + // Flush any remaining unbatched blocks. + system.SubmitBatches(t, l1BlockTimeSeconds) + system.ProcessCrossSafe() + + // Ensure the supervisor has processed up to the current L1 head. + l1Head := eth.InfoToL1BlockRef(eth.HeaderBlockInfo(actors.L1Miner.L1Chain().CurrentBlock())) + supStatus, err := actors.Supervisor.SyncStatus(t.Ctx()) + require.NoError(t, err) + require.Equal(t, l1Head, supStatus.MinSyncedL1, "supervisor did not catch up to L1 head") + + // Now execute the old message on chain B + // During consolidation, the FPP will need to verify the canonical hash of the early block + // This is where EIP-2935 should provide O(distance/8191) efficiency instead of O(distance) + + actors.ChainA.Sequencer.ActL2StartBlock(t) + actors.ChainB.Sequencer.ActL2StartBlock(t) + msg.ExecuteOn(actors.ChainB) + actors.ChainA.Sequencer.ActL2EndBlock(t) + actors.ChainB.Sequencer.ActL2EndBlock(t) + system.SubmitBatchesAndAdvanceL1(t, l1BlockTimeSeconds) + + endTimestamp := actors.ChainB.Sequencer.L2Unsafe().Time + startTimestamp := endTimestamp - 1 + // Capture the optimistic state before cross-safe processing + preConsolidation := system.Outputs.TransitionState(startTimestamp, consolidateStep, + system.Outputs.OptimisticBlockAtTimestamp(actors.ChainA, endTimestamp), + system.Outputs.OptimisticBlockAtTimestamp(actors.ChainB, endTimestamp), + ).Marshal() + + // Process cross-safe to get the canonical end state + // This will verify the exec message and make the block cross-safe + system.ProcessCrossSafe() + msg.CheckExecuted() + + // Verify the exec block is now cross-safe (which implies cross-unsafe) before running FPP + chainBStatus := actors.ChainB.Sequencer.SyncStatus() + require.Equal(gt, chainBStatus.SafeL2.Number, chainBStatus.UnsafeL2.Number, + "Exec block should be cross-safe before running FPP") + crossSafeEnd := system.Outputs.SuperRoot(endTimestamp) + + // Run the FPP test to verify the consolidation step with RPC tracking + // The FPP will need to verify the canonical hash of the early message block + // EIP-2935 should make this efficient by using the history storage contract + test := &transitionTest{ + name: "Consolidate-DeepCanonicalQuery", + agreedClaim: preConsolidation, + disputedClaim: crossSafeEnd.Marshal(), + disputedTraceIndex: consolidateStep, + expectValid: true, + } + rpcTracker := fpHelpers.NewL2RPCTracker() + rpcTracker.ForceProxyMode() + runFppTestWithL2RPCTracking(gt, test, system.Actors, system.DepSet(), rpcTracker) + + // Verify EIP-2935 provides sublinear efficiency + // Without EIP-2935: would need O(N) queries for ~9900 block distance + // With EIP-2935: should need O(N/8191) queries - approximately 2-3 queries + blockDistance := totalBlocks - int(emitBlockNum) + maxExpectedLookups := blockDistance/params.HistoryServeWindow + 5 // +5 for buffer + snap := rpcTracker.Snapshot() + uniqueBlockFetches := rpcTracker.UniqueBlockFetches() + gt.Logf("Block distance: %d, unique L2 block fetches: %d, max expected: %d, rpc totals: %+v", + blockDistance, uniqueBlockFetches, maxExpectedLookups, snap.TotalByMethod) + require.Greater(gt, uniqueBlockFetches, 0, "Should have fetched at least one L2 block during consolidation") + require.LessOrEqual(gt, uniqueBlockFetches, maxExpectedLookups, + "EIP-2935 should provide sublinear efficiency, but observed too many L2 block fetches") +} + // Returns true if all tests passed, otherwise returns false func runFppAndChallengerTests(gt *testing.T, system *dsl.InteropDSL, tests []*transitionTest) bool { passed := true @@ -1415,6 +1518,10 @@ func runFppAndChallengerTests(gt *testing.T, system *dsl.InteropDSL, tests []*tr } func runFppTest(gt *testing.T, test *transitionTest, actors *dsl.InteropActors, depSet *depset.StaticConfigDependencySet) { + runFppTestWithL2RPCTracking(gt, test, actors, depSet, nil) +} + +func runFppTestWithL2RPCTracking(gt *testing.T, test *transitionTest, actors *dsl.InteropActors, depSet *depset.StaticConfigDependencySet, tracker *fpHelpers.L2RPCTracker) { t := helpers.SubTest(gt) if test.skipProgram { t.Skip("Not yet implemented") @@ -1433,13 +1540,19 @@ func runFppTest(gt *testing.T, test *transitionTest, actors *dsl.InteropActors, if proposalTimestamp == 0 { proposalTimestamp = actors.ChainA.Sequencer.L2Unsafe().Time } + params := []fpHelpers.FixtureInputParam{ + WithInteropEnabled(t, actors, depSet, test.agreedClaim, crypto.Keccak256Hash(test.disputedClaim), proposalTimestamp), + fpHelpers.WithL1Head(l1Head), + } + if tracker != nil { + params = append(params, fpHelpers.WithL2RPCTracker(tracker)) + } fpHelpers.RunFaultProofProgram( t, logger, actors.L1Miner, checkResult, - WithInteropEnabled(t, actors, depSet, test.agreedClaim, crypto.Keccak256Hash(test.disputedClaim), proposalTimestamp), - fpHelpers.WithL1Head(l1Head), + params..., ) } diff --git a/op-e2e/actions/proofs/helpers/env.go b/op-e2e/actions/proofs/helpers/env.go index d700763d7f79f..f708c924cf399 100644 --- a/op-e2e/actions/proofs/helpers/env.go +++ b/op-e2e/actions/proofs/helpers/env.go @@ -153,6 +153,13 @@ func WithL1Head(head common.Hash) FixtureInputParam { } } +// WithL2RPCTracker sets the L2RPCTracker to observe L2 JSON-RPC calls made by the program host. +func WithL2RPCTracker(tracker *L2RPCTracker) FixtureInputParam { + return func(f *FixtureInputs) { + f.L2RPCTracker = tracker + } +} + // RunFaultProofProgram runs the fault proof program for each state transition from genesis up to the provided l2 block num. func (env *L2FaultProofEnv) RunFaultProofProgramFromGenesis(t helpers.Testing, finalL2BlockNum uint64, checkResult CheckResult, fixtureInputParams ...FixtureInputParam) { l2ClaimBlockNum := uint64(0) diff --git a/op-e2e/actions/proofs/helpers/fixture.go b/op-e2e/actions/proofs/helpers/fixture.go index 7df8935ee6ea9..e7a7f8567d3a7 100644 --- a/op-e2e/actions/proofs/helpers/fixture.go +++ b/op-e2e/actions/proofs/helpers/fixture.go @@ -32,4 +32,8 @@ type FixtureInputs struct { InteropEnabled bool `toml:"use-interop"` L2Sources []*FaultProofProgramL2Source + + // L2RPCTracker is an optional observer for L2 JSON-RPC calls made by the host. + // It is not serialized as part of the test fixture inputs. + L2RPCTracker *L2RPCTracker `toml:"-"` } diff --git a/op-e2e/actions/proofs/helpers/l2_rpc_tracker.go b/op-e2e/actions/proofs/helpers/l2_rpc_tracker.go new file mode 100644 index 0000000000000..8ed58c52af548 --- /dev/null +++ b/op-e2e/actions/proofs/helpers/l2_rpc_tracker.go @@ -0,0 +1,264 @@ +package helpers + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "time" + + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" +) + +// L2RPCTracker observes JSON-RPC calls made by the fault proof program host. +// It supports both: +// - in-process tracking (via WrapRPC) +// - more realistic tracking using an HTTP proxy (via StartProxy) +type L2RPCTracker struct { + mu sync.Mutex + + totalByMethod map[string]int + + uniqueBlockByHash map[string]struct{} + uniqueBlockByNum map[string]struct{} + + forceProxy bool + httpClient *http.Client +} + +func NewL2RPCTracker() *L2RPCTracker { + return &L2RPCTracker{ + totalByMethod: make(map[string]int), + uniqueBlockByHash: make(map[string]struct{}), + uniqueBlockByNum: make(map[string]struct{}), + httpClient: &http.Client{Timeout: 30 * time.Second}, + } +} + +// ForceProxyMode makes the tracker collect data via the HTTP proxy. +func (t *L2RPCTracker) ForceProxyMode() { + t.mu.Lock() + defer t.mu.Unlock() + t.forceProxy = true +} + +func (t *L2RPCTracker) shouldUseProxy() bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.forceProxy +} + +// WrapRPC wraps an op-service client.RPC and increments counters for each call. +func (t *L2RPCTracker) WrapRPC(rpcClient client.RPC) client.RPC { + return &trackingRPC{inner: rpcClient, tracker: t} +} + +// StartProxy starts an HTTP JSON-RPC proxy in front of the upstream endpoint. +// The returned URL should be used in place of the upstream URL. +func (t *L2RPCTracker) StartProxy(upstreamURL string) (proxyURL string, closeFn func()) { + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + _ = r.Body.Close() + + t.observeJSONRPCBody(body) + + req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, upstreamURL, bytes.NewReader(body)) + if err != nil { + w.WriteHeader(http.StatusBadGateway) + return + } + req.Header = r.Header.Clone() + + httpClient := t.httpClient + if httpClient == nil { + httpClient = http.DefaultClient + } + resp, err := httpClient.Do(req) + if err != nil { + w.WriteHeader(http.StatusBadGateway) + return + } + defer resp.Body.Close() + + for k, vals := range resp.Header { + for _, v := range vals { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + _, _ = io.Copy(w, resp.Body) + }) + + // Avoid hanging test runs if the upstream is unresponsive. + server := httptest.NewUnstartedServer(h) + server.Config.ReadHeaderTimeout = 5 * time.Second + server.Start() + return server.URL, server.Close +} + +type L2RPCSnapshot struct { + TotalByMethod map[string]int + UniqueGetBlockByHash int + UniqueGetBlockByNum int +} + +// Snapshot returns a snapshot of the current RPC call counts. +func (t *L2RPCTracker) Snapshot() L2RPCSnapshot { + t.mu.Lock() + defer t.mu.Unlock() + + out := L2RPCSnapshot{ + TotalByMethod: make(map[string]int, len(t.totalByMethod)), + UniqueGetBlockByHash: len(t.uniqueBlockByHash), + UniqueGetBlockByNum: len(t.uniqueBlockByNum), + } + for k, v := range t.totalByMethod { + out.TotalByMethod[k] = v + } + return out +} + +// UniqueBlockFetches returns the total number of unique block fetches by hash or number. +func (t *L2RPCTracker) UniqueBlockFetches() int { + t.mu.Lock() + defer t.mu.Unlock() + return len(t.uniqueBlockByHash) + len(t.uniqueBlockByNum) +} + +type trackingRPC struct { + inner client.RPC + tracker *L2RPCTracker +} + +// Close implements client.RPC +func (t *trackingRPC) Close() { t.inner.Close() } + +// CallContext implements client.RPC +func (t *trackingRPC) CallContext(ctx context.Context, result any, method string, args ...any) error { + t.tracker.observeCall(method, args) + return t.inner.CallContext(ctx, result, method, args...) +} + +// BatchCallContext implements client.RPC +func (t *trackingRPC) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { + for i := range b { + t.tracker.observeCall(b[i].Method, b[i].Args) + } + return t.inner.BatchCallContext(ctx, b) +} + +// Subscribe implements client.RPC +func (t *trackingRPC) Subscribe(ctx context.Context, namespace string, channel any, args ...any) (ethereum.Subscription, error) { + return t.inner.Subscribe(ctx, namespace, channel, args...) +} + +func (t *L2RPCTracker) observeCall(method string, args []any) { + t.mu.Lock() + defer t.mu.Unlock() + + t.totalByMethod[method]++ + switch method { + case "eth_getBlockByHash": + if len(args) == 0 { + return + } + if hash, ok := normalizeHashArg(args[0]); ok { + t.uniqueBlockByHash[hash] = struct{}{} + } + case "eth_getBlockByNumber": + if len(args) == 0 { + return + } + if num, ok := normalizeNumberArg(args[0]); ok { + t.uniqueBlockByNum[num] = struct{}{} + } + } +} + +func (t *L2RPCTracker) observeJSONRPCBody(body []byte) { + trimmed := bytes.TrimSpace(body) + if len(trimmed) == 0 { + return + } + if trimmed[0] == '[' { + var reqs []jsonrpcReq + if err := json.Unmarshal(trimmed, &reqs); err != nil { + return + } + for _, req := range reqs { + t.observeCall(req.Method, req.Params) + } + return + } + var req jsonrpcReq + if err := json.Unmarshal(trimmed, &req); err != nil { + return + } + t.observeCall(req.Method, req.Params) +} + +type jsonrpcReq struct { + Method string `json:"method"` + Params []any `json:"params"` +} + +func normalizeHashArg(arg any) (string, bool) { + switch v := arg.(type) { + case common.Hash: + if v == (common.Hash{}) { + return "", false + } + return strings.ToLower(v.Hex()), true + case *common.Hash: + if v == nil || *v == (common.Hash{}) { + return "", false + } + return strings.ToLower(v.Hex()), true + case string: + if !strings.HasPrefix(v, "0x") { + return "", false + } + return strings.ToLower(v), true + case map[string]any: + // in case the caller uses block-number-or-hash objects. + if bh, ok := v["blockHash"].(string); ok && strings.HasPrefix(bh, "0x") { + return strings.ToLower(bh), true + } + } + // fallback to types that implement Hex() string. + type hexer interface{ Hex() string } + if h, ok := arg.(hexer); ok { + hex := h.Hex() + if strings.HasPrefix(hex, "0x") { + return strings.ToLower(hex), true + } + } + return "", false +} + +func normalizeNumberArg(arg any) (string, bool) { + switch v := arg.(type) { + case string: + // number is typically a hex quantity (e.g. "0x10"), but might be labels like "latest". + if !strings.HasPrefix(v, "0x") { + return "", false + } + return strings.ToLower(v), true + } + return "", false +} diff --git a/op-e2e/actions/proofs/helpers/runner.go b/op-e2e/actions/proofs/helpers/runner.go index 151d309d0d871..b143cb33e8a13 100644 --- a/op-e2e/actions/proofs/helpers/runner.go +++ b/op-e2e/actions/proofs/helpers/runner.go @@ -86,9 +86,21 @@ func RunFaultProofProgram(t helpers.Testing, logger log.Logger, l1 *helpers.L1Mi rollupCfgs := make([]*rollup.Config, 0, len(fixtureInputs.L2Sources)) l1chainConfig := l1.L1Chain().Config() l2Endpoints := make([]string, 0, len(fixtureInputs.L2Sources)) + var closeProxies []func() + defer func() { + for _, closeFn := range closeProxies { + closeFn() + } + }() for _, source := range fixtureInputs.L2Sources { rollupCfgs = append(rollupCfgs, source.Node.RollupCfg) - l2Endpoints = append(l2Endpoints, source.Engine.HTTPEndpoint()) + endpoint := source.Engine.HTTPEndpoint() + if fixtureInputs.L2RPCTracker != nil { + proxyURL, closeFn := fixtureInputs.L2RPCTracker.StartProxy(endpoint) + closeProxies = append(closeProxies, closeFn) + endpoint = proxyURL + } + l2Endpoints = append(l2Endpoints, endpoint) } err = RunKonaNative(t, workDir, rollupCfgs, l1chainConfig, l1.HTTPEndpoint(), fakeBeacon.BeaconAddr(), l2Endpoints, *fixtureInputs) @@ -120,8 +132,27 @@ func CreateInprocessPrefetcher( // Set up in-process L2 source var rpcClients []client.RPC - for _, source := range fixtureInputs.L2Sources { - rpcClients = append(rpcClients, source.Engine.RPCClient()) + for i, source := range fixtureInputs.L2Sources { + rpcClient := source.Engine.RPCClient() + if fixtureInputs.L2RPCTracker != nil { + if fixtureInputs.L2RPCTracker.shouldUseProxy() { + upstream := source.Engine.HTTPEndpoint() + proxyURL, closeProxy := fixtureInputs.L2RPCTracker.StartProxy(upstream) + proxyRPC, err := client.NewRPC(ctx, logger, proxyURL) + require.NoError(t, err, "failed to create proxy RPC client") + go func() { + <-ctx.Done() + proxyRPC.Close() + closeProxy() + }() + rpcClient = proxyRPC + logger.Debug("Using L2 RPC proxy for tracking", "index", i) + } else { + rpcClient = fixtureInputs.L2RPCTracker.WrapRPC(rpcClient) + logger.Debug("Wrapping L2 RPC client for tracking", "index", i) + } + } + rpcClients = append(rpcClients, rpcClient) } sources, err := prefetcher.NewRetryingL2Sources(ctx, logger, cfg.Rollups, rpcClients, nil) require.NoError(t, err, "failed to create L2 client") diff --git a/op-node/rollup/interop/config.go b/op-node/rollup/interop/config.go index ed884f94a3d0f..9712dae91aba4 100644 --- a/op-node/rollup/interop/config.go +++ b/op-node/rollup/interop/config.go @@ -12,6 +12,8 @@ import ( "github.com/ethereum-optimism/optimism/op-service/rpc" ) +const defaultIndexingEventQueueSize = 100 + type Config struct { // RPCAddr address to bind RPC server to, to serve external supervisor nodes. // Optional. This will soon be required: running op-node without supervisor is being deprecated. @@ -43,5 +45,5 @@ func (cfg *Config) Setup(ctx context.Context, logger log.Logger, rollupCfg *roll if err != nil { return nil, err } - return indexing.NewIndexingMode(logger, rollupCfg, cfg.RPCAddr, cfg.RPCPort, jwtSecret, l1, l2, m), nil + return indexing.NewIndexingMode(logger, rollupCfg, cfg.RPCAddr, cfg.RPCPort, jwtSecret, l1, l2, m, defaultIndexingEventQueueSize), nil } diff --git a/op-node/rollup/interop/indexing/system.go b/op-node/rollup/interop/indexing/system.go index 61a7d9c519465..cd80aec46ef4a 100644 --- a/op-node/rollup/interop/indexing/system.go +++ b/op-node/rollup/interop/indexing/system.go @@ -82,7 +82,7 @@ type IndexingMode struct { engineController EngineController } -func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, jwtSecret eth.Bytes32, l1 L1Source, l2 L2Source, m opmetrics.RPCMetricer) *IndexingMode { +func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, jwtSecret eth.Bytes32, l1 L1Source, l2 L2Source, m opmetrics.RPCMetricer, eventQueueSize int) *IndexingMode { log = log.With("mode", "indexing", "chainId", cfg.L2ChainID) ctx, cancel := context.WithCancel(context.Background()) out := &IndexingMode{ @@ -91,7 +91,7 @@ func NewIndexingMode(log log.Logger, cfg *rollup.Config, addr string, port int, l1: l1, l2: l2, jwtSecret: jwtSecret, - events: rpc.NewStream[supervisortypes.IndexingEvent](log, 100), + events: rpc.NewStream[supervisortypes.IndexingEvent](log, eventQueueSize), lastReset: newEventTimestamp[struct{}](100 * time.Millisecond), lastUnsafe: newEventTimestamp[eth.BlockID](100 * time.Millisecond),