diff --git a/op-e2e/interop/supersystem.go b/op-e2e/interop/supersystem.go index 8feb206d36d3f..b973a30d50cc6 100644 --- a/op-e2e/interop/supersystem.go +++ b/op-e2e/interop/supersystem.go @@ -419,7 +419,7 @@ func (s *interopE2ESystem) newL2(id string, l2Out *interopgen.L2Output) l2Set { func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { // Be verbose with op-supervisor, it's in early test phase logger := testlog.Logger(s.t, log.LevelDebug).New("role", "supervisor") - cfg := supervisorConfig.Config{ + cfg := &supervisorConfig.Config{ MetricsConfig: metrics.CLIConfig{ Enabled: false, }, @@ -441,9 +441,9 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { depSet := &depset.StaticConfigDependencySet{ Dependencies: make(map[supervisortypes.ChainID]*depset.StaticConfigDependency), } - for id := range s.l2s { - cfg.L2RPCs = append(cfg.L2RPCs, s.l2s[id].l2Geth.UserRPC().RPC()) - chainID := supervisortypes.ChainIDFromBig(s.l2s[id].chainID) + // Iterate over the L2 chain configs. The L2 nodes don't exist yet. + for _, l2Out := range s.worldOutput.L2s { + chainID := supervisortypes.ChainIDFromBig(l2Out.Genesis.Config.ChainID) depSet.Dependencies[chainID] = &depset.StaticConfigDependency{ ActivationTime: 0, HistoryMinTime: 0, @@ -451,7 +451,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { } cfg.DependencySetSource = depSet // Create the supervisor with the configuration - super, err := supervisor.SupervisorFromConfig(context.Background(), &cfg, logger) + super, err := supervisor.SupervisorFromConfig(context.Background(), cfg, logger) require.NoError(s.t, err) // Start the supervisor err = super.Start(context.Background()) @@ -495,7 +495,7 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) { ctx := context.Background() for _, l2 := range s.l2s { err := s.SupervisorClient().AddL2RPC(ctx, l2.l2Geth.UserRPC().RPC()) - require.NoError(s.t, err, "failed to add L2 RPC to supervisor", "error", err) + require.NoError(s.t, err, "failed to add L2 RPC to supervisor") } } diff --git a/op-supervisor/config/config.go b/op-supervisor/config/config.go index e35db705ad9c3..b06d0592593bd 100644 --- a/op-supervisor/config/config.go +++ b/op-supervisor/config/config.go @@ -29,6 +29,10 @@ type Config struct { // MockRun runs the service with a mock backend MockRun bool + // SynchronousProcessors disables background-workers, + // requiring manual triggers for the backend to process anything. + SynchronousProcessors bool + L2RPCs []string Datadir string } diff --git a/op-supervisor/metrics/metrics.go b/op-supervisor/metrics/metrics.go index e025d509ee287..0e6ca9a8da2c9 100644 --- a/op-supervisor/metrics/metrics.go +++ b/op-supervisor/metrics/metrics.go @@ -1,10 +1,10 @@ package metrics import ( - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" "github.com/prometheus/client_golang/prometheus" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) const Namespace = "op_supervisor" @@ -18,7 +18,7 @@ type Metricer interface { CacheAdd(chainID types.ChainID, label string, cacheSize int, evicted bool) CacheGet(chainID types.ChainID, label string, hit bool) - RecordDBEntryCount(chainID types.ChainID, count int64) + RecordDBEntryCount(chainID types.ChainID, kind string, count int64) RecordDBSearchEntriesRead(chainID types.ChainID, count int64) Document() []opmetrics.DocumentedMetric @@ -106,9 +106,10 @@ func NewMetrics(procName string) *Metrics { DBEntryCountVec: factory.NewGaugeVec(prometheus.GaugeOpts{ Namespace: ns, Name: "logdb_entries_current", - Help: "Current number of entries in the log database by chain ID", + Help: "Current number of entries in the database of specified kind and chain ID", }, []string{ "chain", + "kind", }), DBSearchEntriesReadVec: factory.NewHistogramVec(prometheus.HistogramOpts{ Namespace: ns, @@ -159,8 +160,8 @@ func (m *Metrics) CacheGet(chainID types.ChainID, label string, hit bool) { } } -func (m *Metrics) RecordDBEntryCount(chainID types.ChainID, count int64) { - m.DBEntryCountVec.WithLabelValues(chainIDLabel(chainID)).Set(float64(count)) +func (m *Metrics) RecordDBEntryCount(chainID types.ChainID, kind string, count int64) { + m.DBEntryCountVec.WithLabelValues(chainIDLabel(chainID), kind).Set(float64(count)) } func (m *Metrics) RecordDBSearchEntriesRead(chainID types.ChainID, count int64) { diff --git a/op-supervisor/metrics/noop.go b/op-supervisor/metrics/noop.go index 7fad61d4c15a1..7f162f6dc2b86 100644 --- a/op-supervisor/metrics/noop.go +++ b/op-supervisor/metrics/noop.go @@ -19,5 +19,5 @@ func (*noopMetrics) RecordUp() {} func (m *noopMetrics) CacheAdd(_ types.ChainID, _ string, _ int, _ bool) {} func (m *noopMetrics) CacheGet(_ types.ChainID, _ string, _ bool) {} -func (m *noopMetrics) RecordDBEntryCount(_ types.ChainID, _ int64) {} -func (m *noopMetrics) RecordDBSearchEntriesRead(_ types.ChainID, _ int64) {} +func (m *noopMetrics) RecordDBEntryCount(_ types.ChainID, _ string, _ int64) {} +func (m *noopMetrics) RecordDBSearchEntriesRead(_ types.ChainID, _ int64) {} diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index d223b017778c4..7a1eb2eaf78a1 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -18,7 +18,6 @@ import ( "github.com/ethereum-optimism/optimism/op-supervisor/config" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" - "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend" @@ -36,13 +35,22 @@ type SupervisorBackend struct { // Write = set of chains is changing. mu sync.RWMutex + // depSet is the dependency set that the backend uses to know about the chains it is indexing depSet depset.DependencySet - // db holds on to the DB indices for each chain - db *db.ChainsDB + // chainDBs holds on to the DB indices for each chain + chainDBs *db.ChainsDB // chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB chainProcessors map[types.ChainID]*processors.ChainProcessor + + // synchronousProcessors disables background-workers, + // requiring manual triggers for the backend to process anything. + synchronousProcessors bool + + // chainMetrics are used to track metrics for each chain + // they are reused for processors and databases of the same chain + chainMetrics map[types.ChainID]*chainMetrics } var _ frontend.Backend = (*SupervisorBackend)(nil) @@ -51,7 +59,7 @@ var errAlreadyStopped = errors.New("already stopped") func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) { // attempt to prepare the data directory - if err := prepDataDir(cfg.Datadir); err != nil { + if err := db.PrepDataDir(cfg.Datadir); err != nil { return nil, err } @@ -60,12 +68,12 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg if err != nil { return nil, fmt.Errorf("failed to load dependency set: %w", err) } + chains := depSet.Chains() - // create the chains db - chainsDB := db.NewChainsDB(logger) - - // create an empty map of chain monitors - chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(cfg.L2RPCs)) + // create initial per-chain resources + chainsDBs := db.NewChainsDB(logger) + chainProcessors := make(map[types.ChainID]*processors.ChainProcessor, len(chains)) + chainMetrics := make(map[types.ChainID]*chainMetrics, len(chains)) // create the supervisor backend super := &SupervisorBackend{ @@ -73,48 +81,102 @@ func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg m: m, dataDir: cfg.Datadir, depSet: depSet, + chainDBs: chainsDBs, chainProcessors: chainProcessors, - db: chainsDB, + chainMetrics: chainMetrics, + // For testing we can avoid running the processors. + synchronousProcessors: cfg.SynchronousProcessors, + } + + // Initialize the resources of the supervisor backend. + // Stop the supervisor if any of the resources fails to be initialized. + if err := super.initResources(ctx, cfg); err != nil { + err = fmt.Errorf("failed to init resources: %w", err) + return nil, errors.Join(err, super.Stop(ctx)) } - // from the RPC strings, have the supervisor backend create a chain monitor - // don't start the monitor yet, as we will start all monitors at once when Start is called + return super, nil +} + +// initResources initializes all the resources, such as DBs and processors for chains. +// An error may returned, without closing the thus-far initialized resources. +// Upon error the caller should call Stop() on the supervisor backend to clean up and release resources. +func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Config) error { + chains := su.depSet.Chains() + + // for each chain known to the dependency set, create the necessary DB resources + for _, chainID := range chains { + if err := su.openChainDBs(chainID); err != nil { + return fmt.Errorf("failed to open chain %s: %w", chainID, err) + } + } + + // for each chain initialize a chain processor service + for _, chainID := range chains { + logProcessor := processors.NewLogProcessor(chainID, su.chainDBs) + chainProcessor := processors.NewChainProcessor(su.logger, chainID, logProcessor, su.chainDBs) + su.chainProcessors[chainID] = chainProcessor + } + + // the config has some RPC connections to attach to the chain-processors for _, rpc := range cfg.L2RPCs { - err := super.addFromRPC(ctx, logger, rpc, false) + err := su.attachRPC(ctx, rpc) if err != nil { - return nil, fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err) + return fmt.Errorf("failed to add chain monitor for rpc %v: %w", rpc, err) } } - return super, nil + return nil } -// addFromRPC adds a chain monitor to the supervisor backend from an rpc endpoint -// it does not expect to be called after the backend has been started -// it will start the monitor if shouldStart is true -func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, rpc string, _ bool) error { - // create the rpc client, which yields the chain id - rpcClient, chainID, err := clientForL2(ctx, logger, rpc) +// openChainDBs initializes all the DB resources of a specific chain. +// It is a sub-task of initResources. +func (su *SupervisorBackend) openChainDBs(chainID types.ChainID) error { + cm := newChainMetrics(chainID, su.m) + // create metrics and a logdb for the chain + su.chainMetrics[chainID] = cm + + logDB, err := db.OpenLogDB(su.logger, chainID, su.dataDir, cm) if err != nil { - return err + return fmt.Errorf("failed to open logDB of chain %s: %w", chainID, err) } - su.logger.Info("adding from rpc connection", "rpc", rpc, "chainID", chainID) - // create metrics and a logdb for the chain - cm := newChainMetrics(chainID, su.m) - path, err := prepLogDBPath(chainID, su.dataDir) + su.chainDBs.AddLogDB(chainID, logDB) + + localDB, err := db.OpenLocalDerivedFromDB(su.logger, chainID, su.dataDir, cm) if err != nil { - return fmt.Errorf("failed to create datadir for chain %v: %w", chainID, err) + return fmt.Errorf("failed to open local derived-from DB of chain %s: %w", chainID, err) } - logDB, err := logs.NewFromFile(logger, cm, path, true) + su.chainDBs.AddLocalDerivedFromDB(chainID, localDB) + + crossDB, err := db.OpenCrossDerivedFromDB(su.logger, chainID, su.dataDir, cm) if err != nil { - return fmt.Errorf("failed to create logdb for chain %v at %v: %w", chainID, path, err) + return fmt.Errorf("failed to open cross derived-from DB of chain %s: %w", chainID, err) } - if su.chainProcessors[chainID] != nil { - return fmt.Errorf("chain monitor for chain %v already exists", chainID) + su.chainDBs.AddCrossDerivedFromDB(chainID, crossDB) + + su.chainDBs.AddCrossUnsafeTracker(chainID) + return nil +} + +func (su *SupervisorBackend) attachRPC(ctx context.Context, rpc string) error { + su.logger.Info("attaching RPC to chain processor", "rpc", rpc) + + logger := su.logger.New("rpc", rpc) + // create the rpc client, which yields the chain id + rpcClient, chainID, err := clientForL2(ctx, logger, rpc) + if err != nil { + return err + } + if !su.depSet.HasChain(chainID) { + return fmt.Errorf("chain %s is not part of the interop dependency set: %w", chainID, db.ErrUnknownChain) } - // create a client like the monitor would have + cm, ok := su.chainMetrics[chainID] + if !ok { + return fmt.Errorf("failed to find metrics for chain %v", chainID) + } + // create an RPC client that the processor can use cl, err := processors.NewEthClient( ctx, - logger, + logger.New("chain", chainID), cm, rpc, rpcClient, 2*time.Second, @@ -123,10 +185,18 @@ func (su *SupervisorBackend) addFromRPC(ctx context.Context, logger log.Logger, if err != nil { return err } - logProcessor := processors.NewLogProcessor(chainID, su.db) - chainProcessor := processors.NewChainProcessor(logger, cl, chainID, logProcessor, su.db) - su.chainProcessors[chainID] = chainProcessor - su.db.AddLogDB(chainID, logDB) + return su.AttachProcessorSource(chainID, cl) +} + +func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src processors.Source) error { + su.mu.RLock() + defer su.mu.RUnlock() + + proc, ok := su.chainProcessors[chainID] + if !ok { + return fmt.Errorf("unknown chain %s, cannot attach RPC to processor", chainID) + } + proc.SetSource(src) return nil } @@ -150,12 +220,20 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { if !su.started.CompareAndSwap(false, true) { return errors.New("already started") } + // initiate "ResumeFromLastSealedBlock" on the chains db, // which rewinds the database to the last block that is guaranteed to have been fully recorded - if err := su.db.ResumeFromLastSealedBlock(); err != nil { + if err := su.chainDBs.ResumeFromLastSealedBlock(); err != nil { return fmt.Errorf("failed to resume chains db: %w", err) } - // TODO(#12423): init background processors, de-dup with constructor + + if !su.synchronousProcessors { + // Make all the chain-processors run automatic background processing + for _, processor := range su.chainProcessors { + processor.StartBackground() + } + } + return nil } @@ -173,17 +251,15 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { } clear(su.chainProcessors) // close the databases - return su.db.Close() + return su.chainDBs.Close() } -// AddL2RPC adds a new L2 chain to the supervisor backend -// it stops and restarts the backend to add the new chain +// AddL2RPC attaches an RPC as the RPC for the given chain, overriding the previous RPC source, if any. func (su *SupervisorBackend) AddL2RPC(ctx context.Context, rpc string) error { - su.mu.Lock() - defer su.mu.Unlock() + su.mu.RLock() // read-lock: we only modify an existing chain, we don't add/remove chains + defer su.mu.RUnlock() - // start the monitor immediately, as the backend is assumed to already be running - return su.addFromRPC(ctx, su.logger, rpc, true) + return su.attachRPC(ctx, rpc) } // Query methods @@ -196,7 +272,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa chainID := identifier.ChainID blockNum := identifier.BlockNumber logIdx := identifier.LogIndex - _, err := su.db.Check(chainID, blockNum, uint32(logIdx), payloadHash) + _, err := su.chainDBs.Check(chainID, blockNum, uint32(logIdx), payloadHash) if errors.Is(err, entrydb.ErrFuture) { return types.LocalUnsafe, nil } @@ -206,7 +282,7 @@ func (su *SupervisorBackend) CheckMessage(identifier types.Identifier, payloadHa if err != nil { return types.Invalid, fmt.Errorf("failed to check log: %w", err) } - return su.db.Safest(chainID, blockNum, uint32(logIdx)) + return su.chainDBs.Safest(chainID, blockNum, uint32(logIdx)) } func (su *SupervisorBackend) CheckMessages( @@ -234,11 +310,11 @@ func (su *SupervisorBackend) UnsafeView(ctx context.Context, chainID types.Chain su.mu.RLock() defer su.mu.RUnlock() - head, err := su.db.LocalUnsafe(chainID) + head, err := su.chainDBs.LocalUnsafe(chainID) if err != nil { return types.ReferenceView{}, fmt.Errorf("failed to get local-unsafe head: %w", err) } - cross, err := su.db.CrossUnsafe(chainID) + cross, err := su.chainDBs.CrossUnsafe(chainID) if err != nil { return types.ReferenceView{}, fmt.Errorf("failed to get cross-unsafe head: %w", err) } @@ -255,11 +331,11 @@ func (su *SupervisorBackend) SafeView(ctx context.Context, chainID types.ChainID su.mu.RLock() defer su.mu.RUnlock() - _, localSafe, err := su.db.LocalSafe(chainID) + _, localSafe, err := su.chainDBs.LocalSafe(chainID) if err != nil { return types.ReferenceView{}, fmt.Errorf("failed to get local-safe head: %w", err) } - _, crossSafe, err := su.db.CrossSafe(chainID) + _, crossSafe, err := su.chainDBs.CrossSafe(chainID) if err != nil { return types.ReferenceView{}, fmt.Errorf("failed to get cross-safe head: %w", err) } @@ -276,14 +352,22 @@ func (su *SupervisorBackend) Finalized(ctx context.Context, chainID types.ChainI su.mu.RLock() defer su.mu.RUnlock() - return su.db.Finalized(chainID) + v, err := su.chainDBs.Finalized(chainID) + if err != nil { + return eth.BlockID{}, err + } + return v.ID(), nil } func (su *SupervisorBackend) DerivedFrom(ctx context.Context, chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) { su.mu.RLock() defer su.mu.RUnlock() - return su.db.DerivedFrom(chainID, derived) + v, err := su.chainDBs.DerivedFrom(chainID, derived) + if err != nil { + return eth.BlockID{}, err + } + return v.ID(), nil } // Update methods @@ -303,12 +387,12 @@ func (su *SupervisorBackend) UpdateLocalSafe(chainID types.ChainID, derivedFrom su.mu.RLock() defer su.mu.RUnlock() - return su.db.UpdateLocalSafe(chainID, derivedFrom, lastDerived) + return su.chainDBs.UpdateLocalSafe(chainID, derivedFrom, lastDerived) } func (su *SupervisorBackend) UpdateFinalizedL1(chainID types.ChainID, finalized eth.BlockRef) error { su.mu.RLock() defer su.mu.RUnlock() - return su.db.UpdateFinalizedL1(finalized) + return su.chainDBs.UpdateFinalizedL1(finalized) } diff --git a/op-supervisor/supervisor/backend/backend_test.go b/op-supervisor/supervisor/backend/backend_test.go new file mode 100644 index 0000000000000..5bdb49caaaf96 --- /dev/null +++ b/op-supervisor/supervisor/backend/backend_test.go @@ -0,0 +1,138 @@ +package backend + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + types2 "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-service/eth" + oplog "github.com/ethereum-optimism/optimism/op-service/log" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/oppprof" + oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum-optimism/optimism/op-supervisor/config" + "github.com/ethereum-optimism/optimism/op-supervisor/metrics" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/entrydb" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/depset" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +func TestBackendLifetime(t *testing.T) { + logger := testlog.Logger(t, log.LvlInfo) + m := metrics.NoopMetrics + dataDir := t.TempDir() + chainA := types.ChainIDFromUInt64(900) + chainB := types.ChainIDFromUInt64(901) + cfg := &config.Config{ + Version: "test", + LogConfig: oplog.CLIConfig{}, + MetricsConfig: opmetrics.CLIConfig{}, + PprofConfig: oppprof.CLIConfig{}, + RPC: oprpc.CLIConfig{}, + DependencySetSource: &depset.StaticConfigDependencySet{ + Dependencies: map[types.ChainID]*depset.StaticConfigDependency{ + chainA: { + ActivationTime: 42, + HistoryMinTime: 100, + }, + chainB: { + ActivationTime: 30, + HistoryMinTime: 20, + }, + }, + }, + SynchronousProcessors: true, + MockRun: false, + L2RPCs: nil, + Datadir: dataDir, + } + + b, err := NewSupervisorBackend(context.Background(), logger, m, cfg) + require.NoError(t, err) + t.Log("initialized!") + + src := &testutils.MockL1Source{} + + blockX := eth.BlockRef{ + Hash: common.Hash{0xaa}, + Number: 0, + ParentHash: common.Hash{}, // genesis has no parent hash + Time: 10000, + } + blockY := eth.BlockRef{ + Hash: common.Hash{0xbb}, + Number: blockX.Number + 1, + ParentHash: blockX.Hash, + Time: blockX.Time + 2, + } + + require.NoError(t, b.AttachProcessorSource(chainA, src)) + + require.FileExists(t, filepath.Join(cfg.Datadir, "900", "log.db"), "must have logs DB 900") + require.FileExists(t, filepath.Join(cfg.Datadir, "901", "log.db"), "must have logs DB 901") + require.FileExists(t, filepath.Join(cfg.Datadir, "900", "local_safe.db"), "must have local safe DB 900") + require.FileExists(t, filepath.Join(cfg.Datadir, "901", "local_safe.db"), "must have local safe DB 901") + require.FileExists(t, filepath.Join(cfg.Datadir, "900", "cross_safe.db"), "must have cross safe DB 900") + require.FileExists(t, filepath.Join(cfg.Datadir, "901", "cross_safe.db"), "must have cross safe DB 901") + + err = b.Start(context.Background()) + require.NoError(t, err) + t.Log("started!") + + _, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{}) + require.ErrorIs(t, err, entrydb.ErrFuture, "no data yet, need local-unsafe") + + src.ExpectL1BlockRefByNumber(0, blockX, nil) + src.ExpectFetchReceipts(blockX.Hash, &testutils.MockBlockInfo{ + InfoHash: blockX.Hash, + InfoParentHash: blockX.ParentHash, + InfoNum: blockX.Number, + InfoTime: blockX.Time, + InfoReceiptRoot: types2.EmptyReceiptsHash, + }, nil, nil) + + src.ExpectL1BlockRefByNumber(1, blockY, nil) + src.ExpectFetchReceipts(blockY.Hash, &testutils.MockBlockInfo{ + InfoHash: blockY.Hash, + InfoParentHash: blockY.ParentHash, + InfoNum: blockY.Number, + InfoTime: blockY.Time, + InfoReceiptRoot: types2.EmptyReceiptsHash, + }, nil, nil) + + src.ExpectL1BlockRefByNumber(2, eth.L1BlockRef{}, ethereum.NotFound) + + err = b.UpdateLocalUnsafe(chainA, blockY) + require.NoError(t, err) + // Make the processing happen, so we can rely on the new chain information, + // and not run into errors for future data that isn't mocked at this time. + b.chainProcessors[chainA].ProcessToHead() + + _, err = b.UnsafeView(context.Background(), chainA, types.ReferenceView{}) + require.ErrorIs(t, err, entrydb.ErrFuture, "still no data yet, need cross-unsafe") + + err = b.chainDBs.UpdateCrossUnsafe(chainA, types.BlockSeal{ + Hash: blockX.Hash, + Number: blockX.Number, + Timestamp: blockX.Time, + }) + require.NoError(t, err) + + v, err := b.UnsafeView(context.Background(), chainA, types.ReferenceView{}) + require.NoError(t, err, "have a functioning cross/local unsafe view now") + require.Equal(t, blockX.ID(), v.Cross) + require.Equal(t, blockY.ID(), v.Local) + + err = b.Stop(context.Background()) + require.NoError(t, err) + t.Log("stopped!") +} diff --git a/op-supervisor/supervisor/backend/chain_metrics.go b/op-supervisor/supervisor/backend/chain_metrics.go index e51dbabbd7a2f..a067ddc3bfc58 100644 --- a/op-supervisor/supervisor/backend/chain_metrics.go +++ b/op-supervisor/supervisor/backend/chain_metrics.go @@ -10,7 +10,7 @@ type Metrics interface { CacheAdd(chainID types.ChainID, label string, cacheSize int, evicted bool) CacheGet(chainID types.ChainID, label string, hit bool) - RecordDBEntryCount(chainID types.ChainID, count int64) + RecordDBEntryCount(chainID types.ChainID, kind string, count int64) RecordDBSearchEntriesRead(chainID types.ChainID, count int64) } @@ -36,8 +36,8 @@ func (c *chainMetrics) CacheGet(label string, hit bool) { c.delegate.CacheGet(c.chainID, label, hit) } -func (c *chainMetrics) RecordDBEntryCount(count int64) { - c.delegate.RecordDBEntryCount(c.chainID, count) +func (c *chainMetrics) RecordDBEntryCount(kind string, count int64) { + c.delegate.RecordDBEntryCount(c.chainID, kind, count) } func (c *chainMetrics) RecordDBSearchEntriesRead(count int64) { diff --git a/op-supervisor/supervisor/backend/db/db.go b/op-supervisor/supervisor/backend/db/db.go index b14019ffee1c7..ab8a9a652e5ba 100644 --- a/op-supervisor/supervisor/backend/db/db.go +++ b/op-supervisor/supervisor/backend/db/db.go @@ -10,11 +10,14 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) -var ErrUnknownChain = errors.New("unknown chain") +var ( + ErrUnknownChain = errors.New("unknown chain") +) type LogStorage interface { io.Closer @@ -46,12 +49,14 @@ type LogStorage interface { } type LocalDerivedFromStorage interface { - Last() (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) + Latest() (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) AddDerived(derivedFrom eth.BlockRef, derived eth.BlockRef) error - LastDerived(derivedFrom eth.BlockID) (derived eth.BlockID, err error) - DerivedFrom(derived eth.BlockID) (derivedFrom eth.BlockID, err error) + LastDerivedAt(derivedFrom eth.BlockID) (derived types.BlockSeal, err error) + DerivedFrom(derived eth.BlockID) (derivedFrom types.BlockSeal, err error) } +var _ LocalDerivedFromStorage = (*fromda.DB)(nil) + type CrossDerivedFromStorage interface { LocalDerivedFromStorage // This will start to differ with reorg support @@ -71,6 +76,7 @@ type ChainsDB struct { logDBs map[types.ChainID]LogStorage // cross-unsafe: how far we have processed the unsafe data. + // If present but set to a zeroed value the cross-unsafe will fallback to cross-safe. crossUnsafe map[types.ChainID]types.BlockSeal // local-safe: index of what we optimistically know about L2 blocks being derived from L1 @@ -97,14 +103,47 @@ func NewChainsDB(l log.Logger) *ChainsDB { } } -func (db *ChainsDB) AddLogDB(chain types.ChainID, logDB LogStorage) { +func (db *ChainsDB) AddLogDB(chainID types.ChainID, logDB LogStorage) { + db.mu.Lock() + defer db.mu.Unlock() + + if _, ok := db.logDBs[chainID]; ok { + db.logger.Warn("overwriting existing log DB for chain", "chain", chainID) + } + + db.logDBs[chainID] = logDB +} + +func (db *ChainsDB) AddLocalDerivedFromDB(chainID types.ChainID, dfDB LocalDerivedFromStorage) { + db.mu.Lock() + defer db.mu.Unlock() + + if _, ok := db.localDBs[chainID]; ok { + db.logger.Warn("overwriting existing local derived-from DB for chain", "chain", chainID) + } + + db.localDBs[chainID] = dfDB +} + +func (db *ChainsDB) AddCrossDerivedFromDB(chainID types.ChainID, dfDB CrossDerivedFromStorage) { + db.mu.Lock() + defer db.mu.Unlock() + + if _, ok := db.crossDBs[chainID]; ok { + db.logger.Warn("overwriting existing cross derived-from DB for chain", "chain", chainID) + } + + db.crossDBs[chainID] = dfDB +} + +func (db *ChainsDB) AddCrossUnsafeTracker(chainID types.ChainID) { db.mu.Lock() defer db.mu.Unlock() - if db.logDBs[chain] != nil { - log.Warn("overwriting existing logDB for chain", "chain", chain) + if _, ok := db.crossUnsafe[chainID]; ok { + db.logger.Warn("overwriting existing cross-unsafe tracker for chain", "chain", chainID) } - db.logDBs[chain] = logDB + db.crossUnsafe[chainID] = types.BlockSeal{} } // ResumeFromLastSealedBlock prepares the chains db to resume recording events after a restart. diff --git a/op-supervisor/supervisor/backend/file_layout.go b/op-supervisor/supervisor/backend/db/file_layout.go similarity index 59% rename from op-supervisor/supervisor/backend/file_layout.go rename to op-supervisor/supervisor/backend/db/file_layout.go index 2a94266ef6053..65a2823170dc4 100644 --- a/op-supervisor/supervisor/backend/file_layout.go +++ b/op-supervisor/supervisor/backend/db/file_layout.go @@ -1,4 +1,4 @@ -package backend +package db import ( "fmt" @@ -8,6 +8,22 @@ import ( "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) +func prepLocalDerivedFromDBPath(chainID types.ChainID, datadir string) (string, error) { + dir, err := prepChainDir(chainID, datadir) + if err != nil { + return "", err + } + return filepath.Join(dir, "local_safe.db"), nil +} + +func prepCrossDerivedFromDBPath(chainID types.ChainID, datadir string) (string, error) { + dir, err := prepChainDir(chainID, datadir) + if err != nil { + return "", err + } + return filepath.Join(dir, "cross_safe.db"), nil +} + func prepLogDBPath(chainID types.ChainID, datadir string) (string, error) { dir, err := prepChainDir(chainID, datadir) if err != nil { @@ -24,7 +40,7 @@ func prepChainDir(chainID types.ChainID, datadir string) (string, error) { return dir, nil } -func prepDataDir(datadir string) error { +func PrepDataDir(datadir string) error { if err := os.MkdirAll(datadir, 0755); err != nil { return fmt.Errorf("failed to create data directory %v: %w", datadir, err) } diff --git a/op-supervisor/supervisor/backend/file_layout_test.go b/op-supervisor/supervisor/backend/db/file_layout_test.go similarity index 98% rename from op-supervisor/supervisor/backend/file_layout_test.go rename to op-supervisor/supervisor/backend/db/file_layout_test.go index ae06c3cd6ea46..ab41448805678 100644 --- a/op-supervisor/supervisor/backend/file_layout_test.go +++ b/op-supervisor/supervisor/backend/db/file_layout_test.go @@ -1,4 +1,4 @@ -package backend +package db import ( "math/big" diff --git a/op-supervisor/supervisor/backend/db/fromda/db.go b/op-supervisor/supervisor/backend/db/fromda/db.go index 6c0df1421fc7d..7016fc6b25cb2 100644 --- a/op-supervisor/supervisor/backend/db/fromda/db.go +++ b/op-supervisor/supervisor/backend/db/fromda/db.go @@ -13,10 +13,6 @@ import ( "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) -type Metrics interface { - RecordDBDerivedEntryCount(count int64) -} - type EntryStore interface { Size() int64 LastEntryIdx() entrydb.EntryIdx diff --git a/op-supervisor/supervisor/backend/db/fromda/metrics.go b/op-supervisor/supervisor/backend/db/fromda/metrics.go new file mode 100644 index 0000000000000..f82bf0037ddb1 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/fromda/metrics.go @@ -0,0 +1,25 @@ +package fromda + +type Metrics interface { + RecordDBDerivedEntryCount(count int64) +} + +type ChainMetrics interface { + RecordDBEntryCount(kind string, count int64) +} + +type delegate struct { + inner ChainMetrics + kind string +} + +func (d *delegate) RecordDBDerivedEntryCount(count int64) { + d.inner.RecordDBEntryCount(d.kind, count) +} + +func AdaptMetrics(chainMetrics ChainMetrics, kind string) Metrics { + return &delegate{ + kind: kind, + inner: chainMetrics, + } +} diff --git a/op-supervisor/supervisor/backend/db/logs/db.go b/op-supervisor/supervisor/backend/db/logs/db.go index ca05bf99d70c7..0094997550595 100644 --- a/op-supervisor/supervisor/backend/db/logs/db.go +++ b/op-supervisor/supervisor/backend/db/logs/db.go @@ -21,7 +21,7 @@ const ( ) type Metrics interface { - RecordDBEntryCount(count int64) + RecordDBEntryCount(kind string, count int64) RecordDBSearchEntriesRead(count int64) } @@ -122,7 +122,7 @@ func (db *DB) trimToLastSealed() error { } func (db *DB) updateEntryCountMetric() { - db.m.RecordDBEntryCount(db.store.Size()) + db.m.RecordDBEntryCount("log", db.store.Size()) } func (db *DB) IteratorStartingAt(sealedNum uint64, logsSince uint32) (Iterator, error) { @@ -295,7 +295,7 @@ func (db *DB) newIteratorAt(blockNum uint64, logIndex uint32) (*iterator, error) }() // First walk up to the block that we are sealed up to (incl.) for { - if _, n, _ := iter.SealedBlock(); n == blockNum { // we may already have it exactly + if _, n, ok := iter.SealedBlock(); ok && n == blockNum { // we may already have it exactly break } if err := iter.NextBlock(); errors.Is(err, entrydb.ErrFuture) { diff --git a/op-supervisor/supervisor/backend/db/logs/db_test.go b/op-supervisor/supervisor/backend/db/logs/db_test.go index 02ab16adaca3f..718bc68a04f4b 100644 --- a/op-supervisor/supervisor/backend/db/logs/db_test.go +++ b/op-supervisor/supervisor/backend/db/logs/db_test.go @@ -1133,7 +1133,7 @@ type stubMetrics struct { entriesReadForSearch int64 } -func (s *stubMetrics) RecordDBEntryCount(count int64) { +func (s *stubMetrics) RecordDBEntryCount(kind string, count int64) { s.entryCount = count } diff --git a/op-supervisor/supervisor/backend/db/open.go b/op-supervisor/supervisor/backend/db/open.go new file mode 100644 index 0000000000000..972a466fb35e0 --- /dev/null +++ b/op-supervisor/supervisor/backend/db/open.go @@ -0,0 +1,47 @@ +package db + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/fromda" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +func OpenLogDB(logger log.Logger, chainID types.ChainID, dataDir string, m logs.Metrics) (*logs.DB, error) { + path, err := prepLogDBPath(chainID, dataDir) + if err != nil { + return nil, fmt.Errorf("failed to create datadir for chain %s: %w", chainID, err) + } + logDB, err := logs.NewFromFile(logger, m, path, true) + if err != nil { + return nil, fmt.Errorf("failed to create logdb for chain %s at %v: %w", chainID, path, err) + } + return logDB, nil +} + +func OpenLocalDerivedFromDB(logger log.Logger, chainID types.ChainID, dataDir string, m fromda.ChainMetrics) (*fromda.DB, error) { + path, err := prepLocalDerivedFromDBPath(chainID, dataDir) + if err != nil { + return nil, fmt.Errorf("failed to prepare datadir for chain %s: %w", chainID, err) + } + db, err := fromda.NewFromFile(logger, fromda.AdaptMetrics(m, "local_derived"), path) + if err != nil { + return nil, fmt.Errorf("failed to create local-derived for chain %s at %q: %w", chainID, path, err) + } + return db, nil +} + +func OpenCrossDerivedFromDB(logger log.Logger, chainID types.ChainID, dataDir string, m fromda.ChainMetrics) (*fromda.DB, error) { + path, err := prepCrossDerivedFromDBPath(chainID, dataDir) + if err != nil { + return nil, fmt.Errorf("failed to prepare datadir for chain %s: %w", chainID, err) + } + db, err := fromda.NewFromFile(logger, fromda.AdaptMetrics(m, "cross_derived"), path) + if err != nil { + return nil, fmt.Errorf("failed to create cross-derived for chain %s at %q: %w", chainID, path, err) + } + return db, nil +} diff --git a/op-supervisor/supervisor/backend/db/query.go b/op-supervisor/supervisor/backend/db/query.go index 074667e5d4b42..c331379f9a289 100644 --- a/op-supervisor/supervisor/backend/db/query.go +++ b/op-supervisor/supervisor/backend/db/query.go @@ -60,61 +60,69 @@ func (db *ChainsDB) CrossUnsafe(chainID types.ChainID) (types.BlockSeal, error) if !ok { return types.BlockSeal{}, ErrUnknownChain } + // Fall back to cross-safe if cross-unsafe is not known yet + if result == (types.BlockSeal{}) { + _, crossSafe, err := db.CrossSafe(chainID) + if err != nil { + return types.BlockSeal{}, fmt.Errorf("no cross-unsafe known for chain %s, and failed to fall back to cross-safe value: %w", chainID, err) + } + return crossSafe, nil + } return result, nil } -func (db *ChainsDB) LocalSafe(chainID types.ChainID) (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) { +func (db *ChainsDB) LocalSafe(chainID types.ChainID) (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) { db.mu.RLock() defer db.mu.RUnlock() localDB, ok := db.localDBs[chainID] if !ok { - return eth.BlockRef{}, eth.BlockRef{}, ErrUnknownChain + return types.BlockSeal{}, types.BlockSeal{}, ErrUnknownChain } - return localDB.Last() + return localDB.Latest() } -func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom eth.BlockRef, derived eth.BlockRef, err error) { +func (db *ChainsDB) CrossSafe(chainID types.ChainID) (derivedFrom types.BlockSeal, derived types.BlockSeal, err error) { db.mu.RLock() defer db.mu.RUnlock() crossDB, ok := db.crossDBs[chainID] if !ok { - return eth.BlockRef{}, eth.BlockRef{}, ErrUnknownChain + return types.BlockSeal{}, types.BlockSeal{}, ErrUnknownChain } - return crossDB.Last() + return crossDB.Latest() } -func (db *ChainsDB) Finalized(chainID types.ChainID) (eth.BlockID, error) { +func (db *ChainsDB) Finalized(chainID types.ChainID) (types.BlockSeal, error) { db.mu.RLock() defer db.mu.RUnlock() finalizedL1 := db.finalizedL1 if finalizedL1 == (eth.L1BlockRef{}) { - return eth.BlockID{}, errors.New("no finalized L1 signal, cannot determine L2 finality yet") + return types.BlockSeal{}, errors.New("no finalized L1 signal, cannot determine L2 finality yet") } derived, err := db.LastDerivedFrom(chainID, finalizedL1.ID()) if err != nil { - return eth.BlockID{}, errors.New("could not find what was last derived from the finalized L1 block") + return types.BlockSeal{}, errors.New("could not find what was last derived from the finalized L1 block") } return derived, nil } -func (db *ChainsDB) LastDerivedFrom(chainID types.ChainID, derivedFrom eth.BlockID) (derived eth.BlockID, err error) { +func (db *ChainsDB) LastDerivedFrom(chainID types.ChainID, derivedFrom eth.BlockID) (derived types.BlockSeal, err error) { crossDB, ok := db.crossDBs[chainID] if !ok { - return eth.BlockID{}, ErrUnknownChain + return types.BlockSeal{}, ErrUnknownChain } - return crossDB.LastDerived(derivedFrom) + return crossDB.LastDerivedAt(derivedFrom) } -func (db *ChainsDB) DerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom eth.BlockID, err error) { +func (db *ChainsDB) DerivedFrom(chainID types.ChainID, derived eth.BlockID) (derivedFrom types.BlockSeal, err error) { db.mu.RLock() defer db.mu.RUnlock() localDB, ok := db.localDBs[chainID] if !ok { - return eth.BlockID{}, ErrUnknownChain + return types.BlockSeal{}, ErrUnknownChain } return localDB.DerivedFrom(derived) } diff --git a/op-supervisor/supervisor/backend/depset/depset.go b/op-supervisor/supervisor/backend/depset/depset.go index 14e09672f7be7..06127726f0007 100644 --- a/op-supervisor/supervisor/backend/depset/depset.go +++ b/op-supervisor/supervisor/backend/depset/depset.go @@ -25,4 +25,11 @@ type DependencySet interface { // This may return an error if the query temporarily cannot be answered. // E.g. if the DependencySet is syncing new changes. CanInitiateAt(chainID types.ChainID, initTimestamp uint64) (bool, error) + + // Chains returns the list of chains that are part of the dependency set. + Chains() []types.ChainID + + // HasChain determines if a chain is being tracked for interop purposes. + // See CanExecuteAt and CanInitiateAt to check if a chain may message at a given time. + HasChain(chainID types.ChainID) bool } diff --git a/op-supervisor/supervisor/backend/depset/depset_test.go b/op-supervisor/supervisor/backend/depset/depset_test.go index 583b9a4017d47..42dbb284d6e29 100644 --- a/op-supervisor/supervisor/backend/depset/depset_test.go +++ b/op-supervisor/supervisor/backend/depset/depset_test.go @@ -36,6 +36,12 @@ func TestDependencySet(t *testing.T) { result, err := loader.LoadDependencySet(context.Background()) require.NoError(t, err) + chainIDs := result.Chains() + require.Equal(t, []types.ChainID{ + types.ChainIDFromUInt64(900), + types.ChainIDFromUInt64(901), + }, chainIDs) + v, err := result.CanExecuteAt(types.ChainIDFromUInt64(900), 42) require.NoError(t, err) require.True(t, v) diff --git a/op-supervisor/supervisor/backend/depset/static.go b/op-supervisor/supervisor/backend/depset/static.go index e78bcd2516364..0ef2874bb527f 100644 --- a/op-supervisor/supervisor/backend/depset/static.go +++ b/op-supervisor/supervisor/backend/depset/static.go @@ -2,6 +2,9 @@ package depset import ( "context" + "sort" + + "golang.org/x/exp/maps" "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) @@ -46,3 +49,16 @@ func (ds *StaticConfigDependencySet) CanInitiateAt(chainID types.ChainID, initTi } return initTimestamp >= dep.HistoryMinTime, nil } + +func (ds *StaticConfigDependencySet) Chains() []types.ChainID { + out := maps.Keys(ds.Dependencies) + sort.Slice(out, func(i, j int) bool { + return out[i].Cmp(out[j]) < 0 + }) + return out +} + +func (ds *StaticConfigDependencySet) HasChain(chainID types.ChainID) bool { + _, ok := ds.Dependencies[chainID] + return ok +} diff --git a/op-supervisor/supervisor/backend/processors/chain_processor.go b/op-supervisor/supervisor/backend/processors/chain_processor.go index 9f7b630079e17..d27c26fe2bbc2 100644 --- a/op-supervisor/supervisor/backend/processors/chain_processor.go +++ b/op-supervisor/supervisor/backend/processors/chain_processor.go @@ -2,11 +2,13 @@ package processors import ( "context" + "errors" "fmt" "sync" "sync/atomic" "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -15,6 +17,8 @@ import ( "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" ) +var ErrNoRPCSource = errors.New("no RPC client configured") + type Source interface { L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, gethtypes.Receipts, error) @@ -38,8 +42,10 @@ func (fn BlockProcessorFn) ProcessBlock(ctx context.Context, block eth.BlockRef) // ChainProcessor is a HeadProcessor that fills in any skipped blocks between head update events. // It ensures that, absent reorgs, every block in the chain is processed even if some head advancements are skipped. type ChainProcessor struct { - log log.Logger - client Source + log log.Logger + + client Source + clientLock sync.Mutex chain types.ChainID @@ -51,8 +57,6 @@ type ChainProcessor struct { // channel with capacity of 1, full if there is work to do newHead chan struct{} - // bool to indicate if calls are synchronous - synchronous bool // channel with capacity of 1, to signal work complete if running in synchroneous mode out chan struct{} @@ -62,27 +66,37 @@ type ChainProcessor struct { wg sync.WaitGroup } -func NewChainProcessor(log log.Logger, client Source, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor { +func NewChainProcessor(log log.Logger, chain types.ChainID, processor LogProcessor, rewinder DatabaseRewinder) *ChainProcessor { ctx, cancel := context.WithCancel(context.Background()) out := &ChainProcessor{ - log: log, - client: client, + log: log.New("chain", chain), + client: nil, chain: chain, processor: processor, rewinder: rewinder, newHead: make(chan struct{}, 1), - // default to synchronous because we want other processors to wait for this - // in the future we could make this async and have a separate mechanism which forwards the work signal to other processors - synchronous: true, - out: make(chan struct{}, 1), - ctx: ctx, - cancel: cancel, + out: make(chan struct{}, 1), + ctx: ctx, + cancel: cancel, } - out.wg.Add(1) - go out.worker() return out } +func (s *ChainProcessor) SetSource(cl Source) { + s.clientLock.Lock() + defer s.clientLock.Unlock() + s.client = cl +} + +func (s *ChainProcessor) StartBackground() { + s.wg.Add(1) + go s.worker() +} + +func (s *ChainProcessor) ProcessToHead() { + s.work() +} + func (s *ChainProcessor) nextNum() uint64 { headNum, ok := s.rewinder.LatestBlockNum(s.chain) if !ok { @@ -106,11 +120,6 @@ func (s *ChainProcessor) worker() { case <-s.newHead: s.log.Debug("Responding to new head signal") s.work() - // if this chain processor is synchronous, signal completion - // to be picked up by the caller (ChainProcessor.OnNewHead) - if s.synchronous { - s.out <- struct{}{} - } case <-delay.C: s.log.Debug("Checking for updates") s.work() @@ -126,8 +135,14 @@ func (s *ChainProcessor) work() { } target := s.nextNum() if err := s.update(target); err != nil { - s.log.Error("Failed to process new block", "err", err) - // idle until next update trigger + if errors.Is(err, ethereum.NotFound) { + s.log.Info("Cannot find next block yet", "target", target) + } else if errors.Is(err, ErrNoRPCSource) { + s.log.Warn("No RPC source configured, cannot process new blocks") + } else { + s.log.Error("Failed to process new block", "err", err) + // idle until next update trigger + } } else if x := s.lastHead.Load(); target+1 <= x { s.log.Debug("Continuing with next block", "newTarget", target+1, "lastHead", x) continue // instantly continue processing, no need to idle @@ -139,6 +154,13 @@ func (s *ChainProcessor) work() { } func (s *ChainProcessor) update(nextNum uint64) error { + s.clientLock.Lock() + defer s.clientLock.Unlock() + + if s.client == nil { + return ErrNoRPCSource + } + ctx, cancel := context.WithTimeout(s.ctx, time.Second*10) nextL1, err := s.client.L1BlockRefByNumber(ctx, nextNum) next := eth.BlockRef{ @@ -185,10 +207,6 @@ func (s *ChainProcessor) OnNewHead(head eth.BlockRef) error { default: // already requested an update } - // if we are running synchronously, wait for the work to complete - if s.synchronous { - <-s.out - } return nil } diff --git a/op-supervisor/supervisor/types/types.go b/op-supervisor/supervisor/types/types.go index 9a82655f97532..05a9b16331264 100644 --- a/op-supervisor/supervisor/types/types.go +++ b/op-supervisor/supervisor/types/types.go @@ -182,6 +182,10 @@ func (id *ChainID) UnmarshalText(data []byte) error { return nil } +func (id ChainID) Cmp(other ChainID) int { + return (*uint256.Int)(&id).Cmp((*uint256.Int)(&other)) +} + type ReferenceView struct { Local eth.BlockID `json:"local"` Cross eth.BlockID `json:"cross"`