diff --git a/op-e2e/interop/supersystem.go b/op-e2e/interop/supersystem.go index 8fc663f377a0b..158f5d61ec7fe 100644 --- a/op-e2e/interop/supersystem.go +++ b/op-e2e/interop/supersystem.go @@ -480,6 +480,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService { EnableAdmin: true, }, L2RPCs: []string{}, + L1RPC: s.l1.UserRPC().RPC(), Datadir: path.Join(s.t.TempDir(), "supervisor"), } depSet := make(map[supervisortypes.ChainID]*depset.StaticConfigDependency) @@ -536,10 +537,11 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) { s.hdWallet = s.prepareHDWallet() s.worldDeployment, s.worldOutput = s.prepareWorld(w) - // the supervisor and client are created first so that the L2s can use the supervisor + // L1 first so that the Supervisor and L2s can connect to it + s.beacon, s.l1 = s.prepareL1() + s.supervisor = s.prepareSupervisor() - s.beacon, s.l1 = s.prepareL1() s.l2s = s.prepareL2s() s.prepareContracts() diff --git a/op-supervisor/cmd/main_test.go b/op-supervisor/cmd/main_test.go index c61b6f3cb1ca1..06fcf78440dd7 100644 --- a/op-supervisor/cmd/main_test.go +++ b/op-supervisor/cmd/main_test.go @@ -16,6 +16,7 @@ import ( ) var ( + ValidL1RPC = "http://localhost:8545" ValidL2RPCs = []string{"http;//localhost:8545"} ValidDatadir = "./supervisor_test_datadir" ) @@ -38,7 +39,7 @@ func TestLogLevel(t *testing.T) { func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) { cfg := configForArgs(t, addRequiredArgs()) depSet := &depset.JsonDependencySetLoader{Path: "test"} - defaultCfgTempl := config.NewConfig(ValidL2RPCs, depSet, ValidDatadir) + defaultCfgTempl := config.NewConfig(ValidL1RPC, ValidL2RPCs, depSet, ValidDatadir) defaultCfg := *defaultCfgTempl defaultCfg.Version = Version require.Equal(t, defaultCfg, *cfg) @@ -125,6 +126,7 @@ func toArgList(req map[string]string) []string { func requiredArgs() map[string]string { args := map[string]string{ + "--l1-rpc": ValidL1RPC, "--l2-rpcs": ValidL2RPCs[0], "--dependency-set": "test", "--datadir": ValidDatadir, diff --git a/op-supervisor/config/config.go b/op-supervisor/config/config.go index b06d0592593bd..9d733207eaf40 100644 --- a/op-supervisor/config/config.go +++ b/op-supervisor/config/config.go @@ -33,6 +33,8 @@ type Config struct { // requiring manual triggers for the backend to process anything. SynchronousProcessors bool + L1RPC string + L2RPCs []string Datadir string } @@ -56,7 +58,7 @@ func (c *Config) Check() error { // NewConfig creates a new config using default values whenever possible. // Required options with no suitable default are passed as parameters. -func NewConfig(l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config { +func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config { return &Config{ LogConfig: oplog.DefaultCLIConfig(), MetricsConfig: opmetrics.DefaultCLIConfig(), @@ -64,6 +66,7 @@ func NewConfig(l2RPCs []string, depSet depset.DependencySetSource, datadir strin RPC: oprpc.DefaultCLIConfig(), DependencySetSource: depSet, MockRun: false, + L1RPC: l1RPC, L2RPCs: l2RPCs, Datadir: datadir, } diff --git a/op-supervisor/config/config_test.go b/op-supervisor/config/config_test.go index 0d354ca134a56..1e2f6f40c0047 100644 --- a/op-supervisor/config/config_test.go +++ b/op-supervisor/config/config_test.go @@ -67,5 +67,5 @@ func validConfig() *Config { panic(err) } // Should be valid using only the required arguments passed in via the constructor. - return NewConfig([]string{"http://localhost:8545"}, depSet, "./supervisor_testdir") + return NewConfig("http://localhost:8545", []string{"http://localhost:8545"}, depSet, "./supervisor_testdir") } diff --git a/op-supervisor/flags/flags.go b/op-supervisor/flags/flags.go index 9b49823f304a5..d12695010a3cd 100644 --- a/op-supervisor/flags/flags.go +++ b/op-supervisor/flags/flags.go @@ -21,6 +21,11 @@ func prefixEnvVars(name string) []string { } var ( + L1RPCFlag = &cli.StringFlag{ + Name: "l1-rpc", + Usage: "L1 RPC source.", + EnvVars: prefixEnvVars("L1_RPC"), + } L2RPCsFlag = &cli.StringSliceFlag{ Name: "l2-rpcs", Usage: "L2 RPC sources.", @@ -46,6 +51,7 @@ var ( ) var requiredFlags = []cli.Flag{ + L1RPCFlag, L2RPCsFlag, DataDirFlag, DependencySetFlag, @@ -86,6 +92,7 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config { RPC: oprpc.ReadCLIConfig(ctx), DependencySetSource: &depset.JsonDependencySetLoader{Path: ctx.Path(DependencySetFlag.Name)}, MockRun: ctx.Bool(MockRunFlag.Name), + L1RPC: ctx.String(L1RPCFlag.Name), L2RPCs: ctx.StringSlice(L2RPCsFlag.Name), Datadir: ctx.Path(DataDirFlag.Name), } diff --git a/op-supervisor/supervisor/backend/backend.go b/op-supervisor/supervisor/backend/backend.go index 7d368cb40f159..722cabcf3eba2 100644 --- a/op-supervisor/supervisor/backend/backend.go +++ b/op-supervisor/supervisor/backend/backend.go @@ -36,6 +36,9 @@ type SupervisorBackend struct { // chainDBs holds on to the DB indices for each chain chainDBs *db.ChainsDB + // l1Processor watches for new L1 blocks, updates the local-safe DB, and kicks off derivation orchestration + l1Processor *processors.L1Processor + // chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor] @@ -125,6 +128,14 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf su.chainProcessors.Set(chainID, chainProcessor) } + if cfg.L1RPC != "" { + if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil { + return fmt.Errorf("failed to create L1 processor: %w", err) + } + } else { + su.logger.Warn("No L1 RPC configured, L1 processor will not be started") + } + // the config has some RPC connections to attach to the chain-processors for _, rpc := range cfg.L2RPCs { err := su.attachRPC(ctx, rpc) @@ -230,6 +241,38 @@ func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src pr return nil } +func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error { + su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr) + + logger := su.logger.New("l1-rpc", l1RPCAddr) + l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr) + if err != nil { + return fmt.Errorf("failed to setup L1 RPC: %w", err) + } + l1Client, err := sources.NewL1Client( + l1RPC, + su.logger, + nil, + // placeholder config for the L1 + sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100)) + if err != nil { + return fmt.Errorf("failed to setup L1 Client: %w", err) + } + su.AttachL1Source(l1Client) + return nil +} + +// attachL1Source attaches an L1 source to the L1 processor. +// If the L1 processor does not exist, it is created and started. +func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) { + if su.l1Processor == nil { + su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, source) + su.l1Processor.Start() + } else { + su.l1Processor.AttachClient(source) + } +} + func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) { ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc) if err != nil { @@ -254,6 +297,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error { return fmt.Errorf("failed to resume chains db: %w", err) } + // start the L1 processor if it exists + if su.l1Processor != nil { + su.l1Processor.Start() + } + if !su.synchronousProcessors { // Make all the chain-processors run automatic background processing su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool { @@ -278,6 +326,12 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error { return errAlreadyStopped } su.logger.Info("Closing supervisor backend") + + // stop the L1 processor + if su.l1Processor != nil { + su.l1Processor.Stop() + } + // close all processors su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool { su.logger.Info("stopping chain processor", "chainID", id) diff --git a/op-supervisor/supervisor/backend/backend_test.go b/op-supervisor/supervisor/backend/backend_test.go index c104bf0bae5dc..a8fc4de438e7a 100644 --- a/op-supervisor/supervisor/backend/backend_test.go +++ b/op-supervisor/supervisor/backend/backend_test.go @@ -62,6 +62,7 @@ func TestBackendLifetime(t *testing.T) { require.NoError(t, err) t.Log("initialized!") + l1Src := &testutils.MockL1Source{} src := &testutils.MockL1Source{} blockX := eth.BlockRef{ @@ -77,6 +78,7 @@ func TestBackendLifetime(t *testing.T) { Time: blockX.Time + 2, } + b.AttachL1Source(l1Src) require.NoError(t, b.AttachProcessorSource(chainA, src)) require.FileExists(t, filepath.Join(cfg.Datadir, "900", "log.db"), "must have logs DB 900") diff --git a/op-supervisor/supervisor/backend/db/query.go b/op-supervisor/supervisor/backend/db/query.go index bbee01d71b4d0..5897ff3c9242b 100644 --- a/op-supervisor/supervisor/backend/db/query.go +++ b/op-supervisor/supervisor/backend/db/query.go @@ -30,6 +30,32 @@ func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) { return logDB.LatestSealedBlockNum() } +// LastCommonL1 returns the latest common L1 block between all chains in the database. +// it only considers block numbers, not hash. That's because the L1 source is the same for all chains +// this data can be used to determine the starting point for L1 processing +func (db *ChainsDB) LastCommonL1() (types.BlockSeal, error) { + common := types.BlockSeal{} + for _, chain := range db.depSet.Chains() { + ldb, ok := db.localDBs.Get(chain) + if !ok { + return types.BlockSeal{}, types.ErrUnknownChain + } + _, derivedFrom, err := ldb.Latest() + if err != nil { + return types.BlockSeal{}, fmt.Errorf("failed to determine Last Common L1: %w", err) + } + common = derivedFrom + // if the common block isn't yet set, + // or if the new common block is older than the current common block + // set the common block + if common == (types.BlockSeal{}) || + derivedFrom.Number < common.Number { + common = derivedFrom + } + } + return common, nil +} + func (db *ChainsDB) IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error { v, ok := db.crossUnsafe.Get(chainID) if !ok { diff --git a/op-supervisor/supervisor/backend/db/update.go b/op-supervisor/supervisor/backend/db/update.go index 7ae7fde58a8bd..92c78b408a389 100644 --- a/op-supervisor/supervisor/backend/db/update.go +++ b/op-supervisor/supervisor/backend/db/update.go @@ -1,6 +1,7 @@ package db import ( + "errors" "fmt" "github.com/ethereum/go-ethereum/common" @@ -86,3 +87,40 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error { db.logger.Info("Updated finalized L1", "finalizedL1", finalized) return nil } + +// RecordNewL1 records a new L1 block in the database. +// it uses the latest derived L2 block as the derived block for the new L1 block. +func (db *ChainsDB) RecordNewL1(ref eth.BlockRef) error { + for _, chain := range db.depSet.Chains() { + // get local derivation database + ldb, ok := db.localDBs.Get(chain) + if !ok { + return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain) + } + // get the latest derived and derivedFrom blocks + derivedFrom, derived, err := ldb.Latest() + if err != nil { + return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err) + } + // make a ref from the latest derived block + derivedParent, err := ldb.PreviousDerived(derived.ID()) + if errors.Is(err, types.ErrFuture) { + db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err) + } else if err != nil { + db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err) + return err + } + derivedRef := derived.MustWithParent(derivedParent.ID()) + // don't push the new L1 block if it's not newer than the latest derived block + if derivedFrom.Number >= ref.Number { + db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom) + continue + } + // the database is extended with the new L1 and the existing L2 + if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil { + db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err) + return err + } + } + return nil +} diff --git a/op-supervisor/supervisor/backend/processors/l1_processor.go b/op-supervisor/supervisor/backend/processors/l1_processor.go new file mode 100644 index 0000000000000..063acd056e0ae --- /dev/null +++ b/op-supervisor/supervisor/backend/processors/l1_processor.go @@ -0,0 +1,127 @@ +package processors + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" + "github.com/ethereum/go-ethereum/log" +) + +type chainsDB interface { + RecordNewL1(ref eth.BlockRef) error + LastCommonL1() (types.BlockSeal, error) +} + +type L1Source interface { + L1BlockRefByNumber(ctx context.Context, number uint64) (eth.L1BlockRef, error) +} + +type L1Processor struct { + log log.Logger + client L1Source + clientMu sync.Mutex + running atomic.Bool + + currentNumber uint64 + tickDuration time.Duration + + db chainsDB + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +func NewL1Processor(log log.Logger, cdb chainsDB, client L1Source) *L1Processor { + ctx, cancel := context.WithCancel(context.Background()) + return &L1Processor{ + client: client, + db: cdb, + log: log.New("service", "l1-processor"), + tickDuration: 6 * time.Second, + ctx: ctx, + cancel: cancel, + } +} + +func (p *L1Processor) AttachClient(client L1Source) { + p.clientMu.Lock() + defer p.clientMu.Unlock() + p.client = client +} + +func (p *L1Processor) Start() { + // if already running, do nothing + if p.running.Load() { + return + } + p.running.Store(true) + p.currentNumber = 0 + // if there is an issue getting the last common L1, default to starting from 0 + // consider making this a fatal error in the future once initialization is more robust + if lastL1, err := p.db.LastCommonL1(); err == nil { + p.currentNumber = lastL1.Number + } + p.wg.Add(1) + go p.worker() +} + +func (p *L1Processor) Stop() { + // if not running, do nothing + if !p.running.Load() { + return + } + p.cancel() + p.wg.Wait() + p.running.Store(false) +} + +// worker runs a loop that checks for new L1 blocks at a regular interval +func (p *L1Processor) worker() { + defer p.wg.Done() + delay := time.NewTicker(p.tickDuration) + for { + select { + case <-p.ctx.Done(): + return + case <-delay.C: + p.log.Debug("Checking for new L1 block", "current", p.currentNumber) + err := p.work() + if err != nil { + p.log.Warn("Failed to process L1", "err", err) + } + } + } +} + +// work checks for a new L1 block and processes it if found +// the starting point is set when Start is called, and blocks are processed searched incrementally +// if a new block is found, it is recorded in the database and the target number is updated +// in the future it will also kick of derivation management for the sync nodes +func (p *L1Processor) work() error { + p.clientMu.Lock() + defer p.clientMu.Unlock() + nextNumber := p.currentNumber + 1 + ref, err := p.client.L1BlockRefByNumber(p.ctx, nextNumber) + if err != nil { + return err + } + // record the new L1 block + p.log.Debug("Processing new L1 block", "block", ref) + err = p.db.RecordNewL1(ref) + if err != nil { + return err + } + + // go drive derivation on this new L1 input + // only possible once bidirectional RPC and new derivers are in place + // could do this as a function callback to a more appropriate driver + + // update the target number + p.currentNumber = nextNumber + return nil +} diff --git a/op-supervisor/supervisor/backend/processors/l1_processor_test.go b/op-supervisor/supervisor/backend/processors/l1_processor_test.go new file mode 100644 index 0000000000000..143ebb66b74ea --- /dev/null +++ b/op-supervisor/supervisor/backend/processors/l1_processor_test.go @@ -0,0 +1,107 @@ +package processors + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/testlog" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" +) + +type mockChainsDB struct { + recordNewL1Fn func(ref eth.BlockRef) error + lastCommonL1Fn func() (types.BlockSeal, error) +} + +func (m *mockChainsDB) RecordNewL1(ref eth.BlockRef) error { + if m.recordNewL1Fn != nil { + return m.recordNewL1Fn(ref) + } + return nil +} + +func (m *mockChainsDB) LastCommonL1() (types.BlockSeal, error) { + if m.lastCommonL1Fn != nil { + return m.lastCommonL1Fn() + } + return types.BlockSeal{}, nil +} + +type mockL1BlockRefByNumberFetcher struct { + l1BlockByNumberFn func() (eth.L1BlockRef, error) +} + +func (m *mockL1BlockRefByNumberFetcher) L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) { + if m.l1BlockByNumberFn != nil { + return m.l1BlockByNumberFn() + } + return eth.L1BlockRef{}, nil +} + +func TestL1Processor(t *testing.T) { + processorForTesting := func() *L1Processor { + ctx, cancel := context.WithCancel(context.Background()) + proc := &L1Processor{ + log: testlog.Logger(t, log.LvlInfo), + client: &mockL1BlockRefByNumberFetcher{}, + currentNumber: 0, + tickDuration: 1 * time.Second, + db: &mockChainsDB{}, + ctx: ctx, + cancel: cancel, + } + return proc + } + t.Run("Initializes LastCommonL1", func(t *testing.T) { + proc := processorForTesting() + proc.db.(*mockChainsDB).lastCommonL1Fn = func() (types.BlockSeal, error) { + return types.BlockSeal{Number: 10}, nil + } + // before starting, the current number should be 0 + require.Equal(t, uint64(0), proc.currentNumber) + proc.Start() + defer proc.Stop() + // after starting, the current number should still be 0 + require.Equal(t, uint64(10), proc.currentNumber) + }) + t.Run("Initializes LastCommonL1 at 0 if error", func(t *testing.T) { + proc := processorForTesting() + proc.db.(*mockChainsDB).lastCommonL1Fn = func() (types.BlockSeal, error) { + return types.BlockSeal{Number: 10}, fmt.Errorf("error") + } + // before starting, the current number should be 0 + require.Equal(t, uint64(0), proc.currentNumber) + proc.Start() + defer proc.Stop() + // the error means the current number should still be 0 + require.Equal(t, uint64(0), proc.currentNumber) + }) + t.Run("Records new L1", func(t *testing.T) { + proc := processorForTesting() + // return a new block number each time + num := uint64(0) + proc.client.(*mockL1BlockRefByNumberFetcher).l1BlockByNumberFn = func() (eth.L1BlockRef, error) { + defer func() { num++ }() + return eth.L1BlockRef{Number: num}, nil + } + // confirm that recordNewL1 is called for each block number received + called := uint64(0) + proc.db.(*mockChainsDB).recordNewL1Fn = func(ref eth.BlockRef) error { + require.Equal(t, called, ref.Number) + called++ + return nil + } + proc.Start() + defer proc.Stop() + require.Eventually(t, func() bool { + return called >= 1 && proc.currentNumber >= 1 + }, 10*time.Second, 100*time.Millisecond) + + }) + +}