Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions op-e2e/interop/supersystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ func (s *interopE2ESystem) prepareSupervisor() *supervisor.SupervisorService {
EnableAdmin: true,
},
L2RPCs: []string{},
L1RPC: s.l1.UserRPC().RPC(),
Datadir: path.Join(s.t.TempDir(), "supervisor"),
}
depSet := make(map[supervisortypes.ChainID]*depset.StaticConfigDependency)
Expand Down Expand Up @@ -536,10 +537,11 @@ func (s *interopE2ESystem) prepare(t *testing.T, w worldResourcePaths) {
s.hdWallet = s.prepareHDWallet()
s.worldDeployment, s.worldOutput = s.prepareWorld(w)

// the supervisor and client are created first so that the L2s can use the supervisor
// L1 first so that the Supervisor and L2s can connect to it
s.beacon, s.l1 = s.prepareL1()

s.supervisor = s.prepareSupervisor()

s.beacon, s.l1 = s.prepareL1()
s.l2s = s.prepareL2s()

s.prepareContracts()
Expand Down
4 changes: 3 additions & 1 deletion op-supervisor/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

var (
ValidL1RPC = "http://localhost:8545"
ValidL2RPCs = []string{"http;//localhost:8545"}
ValidDatadir = "./supervisor_test_datadir"
)
Expand All @@ -38,7 +39,7 @@ func TestLogLevel(t *testing.T) {
func TestDefaultCLIOptionsMatchDefaultConfig(t *testing.T) {
cfg := configForArgs(t, addRequiredArgs())
depSet := &depset.JsonDependencySetLoader{Path: "test"}
defaultCfgTempl := config.NewConfig(ValidL2RPCs, depSet, ValidDatadir)
defaultCfgTempl := config.NewConfig(ValidL1RPC, ValidL2RPCs, depSet, ValidDatadir)
defaultCfg := *defaultCfgTempl
defaultCfg.Version = Version
require.Equal(t, defaultCfg, *cfg)
Expand Down Expand Up @@ -125,6 +126,7 @@ func toArgList(req map[string]string) []string {

func requiredArgs() map[string]string {
args := map[string]string{
"--l1-rpc": ValidL1RPC,
"--l2-rpcs": ValidL2RPCs[0],
"--dependency-set": "test",
"--datadir": ValidDatadir,
Expand Down
5 changes: 4 additions & 1 deletion op-supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {
// requiring manual triggers for the backend to process anything.
SynchronousProcessors bool

L1RPC string

L2RPCs []string
Datadir string
}
Expand All @@ -56,14 +58,15 @@ func (c *Config) Check() error {

// NewConfig creates a new config using default values whenever possible.
// Required options with no suitable default are passed as parameters.
func NewConfig(l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config {
func NewConfig(l1RPC string, l2RPCs []string, depSet depset.DependencySetSource, datadir string) *Config {
return &Config{
LogConfig: oplog.DefaultCLIConfig(),
MetricsConfig: opmetrics.DefaultCLIConfig(),
PprofConfig: oppprof.DefaultCLIConfig(),
RPC: oprpc.DefaultCLIConfig(),
DependencySetSource: depSet,
MockRun: false,
L1RPC: l1RPC,
L2RPCs: l2RPCs,
Datadir: datadir,
}
Expand Down
2 changes: 1 addition & 1 deletion op-supervisor/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,5 @@ func validConfig() *Config {
panic(err)
}
// Should be valid using only the required arguments passed in via the constructor.
return NewConfig([]string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
return NewConfig("http://localhost:8545", []string{"http://localhost:8545"}, depSet, "./supervisor_testdir")
}
7 changes: 7 additions & 0 deletions op-supervisor/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func prefixEnvVars(name string) []string {
}

var (
L1RPCFlag = &cli.StringFlag{
Name: "l1-rpc",
Usage: "L1 RPC source.",
EnvVars: prefixEnvVars("L1_RPC"),
}
L2RPCsFlag = &cli.StringSliceFlag{
Name: "l2-rpcs",
Usage: "L2 RPC sources.",
Expand All @@ -46,6 +51,7 @@ var (
)

var requiredFlags = []cli.Flag{
L1RPCFlag,
L2RPCsFlag,
DataDirFlag,
DependencySetFlag,
Expand Down Expand Up @@ -86,6 +92,7 @@ func ConfigFromCLI(ctx *cli.Context, version string) *config.Config {
RPC: oprpc.ReadCLIConfig(ctx),
DependencySetSource: &depset.JsonDependencySetLoader{Path: ctx.Path(DependencySetFlag.Name)},
MockRun: ctx.Bool(MockRunFlag.Name),
L1RPC: ctx.String(L1RPCFlag.Name),
L2RPCs: ctx.StringSlice(L2RPCsFlag.Name),
Datadir: ctx.Path(DataDirFlag.Name),
}
Expand Down
54 changes: 54 additions & 0 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type SupervisorBackend struct {
// chainDBs holds on to the DB indices for each chain
chainDBs *db.ChainsDB

// l1Processor watches for new L1 blocks, updates the local-safe DB, and kicks off derivation orchestration
l1Processor *processors.L1Processor

// chainProcessors are notified of new unsafe blocks, and add the unsafe log events data into the events DB
chainProcessors locks.RWMap[types.ChainID, *processors.ChainProcessor]

Expand Down Expand Up @@ -125,6 +128,14 @@ func (su *SupervisorBackend) initResources(ctx context.Context, cfg *config.Conf
su.chainProcessors.Set(chainID, chainProcessor)
}

if cfg.L1RPC != "" {
if err := su.attachL1RPC(ctx, cfg.L1RPC); err != nil {
return fmt.Errorf("failed to create L1 processor: %w", err)
}
} else {
su.logger.Warn("No L1 RPC configured, L1 processor will not be started")
}

// the config has some RPC connections to attach to the chain-processors
for _, rpc := range cfg.L2RPCs {
err := su.attachRPC(ctx, rpc)
Expand Down Expand Up @@ -230,6 +241,38 @@ func (su *SupervisorBackend) AttachProcessorSource(chainID types.ChainID, src pr
return nil
}

func (su *SupervisorBackend) attachL1RPC(ctx context.Context, l1RPCAddr string) error {
su.logger.Info("attaching L1 RPC to L1 processor", "rpc", l1RPCAddr)

logger := su.logger.New("l1-rpc", l1RPCAddr)
l1RPC, err := client.NewRPC(ctx, logger, l1RPCAddr)
if err != nil {
return fmt.Errorf("failed to setup L1 RPC: %w", err)
}
l1Client, err := sources.NewL1Client(
l1RPC,
su.logger,
nil,
// placeholder config for the L1
sources.L1ClientSimpleConfig(true, sources.RPCKindBasic, 100))
if err != nil {
return fmt.Errorf("failed to setup L1 Client: %w", err)
}
su.AttachL1Source(l1Client)
return nil
}

// attachL1Source attaches an L1 source to the L1 processor.
// If the L1 processor does not exist, it is created and started.
func (su *SupervisorBackend) AttachL1Source(source processors.L1Source) {
if su.l1Processor == nil {
su.l1Processor = processors.NewL1Processor(su.logger, su.chainDBs, source)
su.l1Processor.Start()
} else {
su.l1Processor.AttachClient(source)
}
}

func clientForL2(ctx context.Context, logger log.Logger, rpc string) (client.RPC, types.ChainID, error) {
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
Expand All @@ -254,6 +297,11 @@ func (su *SupervisorBackend) Start(ctx context.Context) error {
return fmt.Errorf("failed to resume chains db: %w", err)
}

// start the L1 processor if it exists
if su.l1Processor != nil {
su.l1Processor.Start()
}

if !su.synchronousProcessors {
// Make all the chain-processors run automatic background processing
su.chainProcessors.Range(func(_ types.ChainID, processor *processors.ChainProcessor) bool {
Expand All @@ -278,6 +326,12 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
return errAlreadyStopped
}
su.logger.Info("Closing supervisor backend")

// stop the L1 processor
if su.l1Processor != nil {
su.l1Processor.Stop()
}

// close all processors
su.chainProcessors.Range(func(id types.ChainID, processor *processors.ChainProcessor) bool {
su.logger.Info("stopping chain processor", "chainID", id)
Expand Down
2 changes: 2 additions & 0 deletions op-supervisor/supervisor/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestBackendLifetime(t *testing.T) {
require.NoError(t, err)
t.Log("initialized!")

l1Src := &testutils.MockL1Source{}
src := &testutils.MockL1Source{}

blockX := eth.BlockRef{
Expand All @@ -77,6 +78,7 @@ func TestBackendLifetime(t *testing.T) {
Time: blockX.Time + 2,
}

b.AttachL1Source(l1Src)
require.NoError(t, b.AttachProcessorSource(chainA, src))

require.FileExists(t, filepath.Join(cfg.Datadir, "900", "log.db"), "must have logs DB 900")
Expand Down
26 changes: 26 additions & 0 deletions op-supervisor/supervisor/backend/db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,32 @@ func (db *ChainsDB) LatestBlockNum(chain types.ChainID) (num uint64, ok bool) {
return logDB.LatestSealedBlockNum()
}

// LastCommonL1 returns the latest common L1 block between all chains in the database.
// it only considers block numbers, not hash. That's because the L1 source is the same for all chains
// this data can be used to determine the starting point for L1 processing
func (db *ChainsDB) LastCommonL1() (types.BlockSeal, error) {
common := types.BlockSeal{}
for _, chain := range db.depSet.Chains() {
ldb, ok := db.localDBs.Get(chain)
if !ok {
return types.BlockSeal{}, types.ErrUnknownChain
}
_, derivedFrom, err := ldb.Latest()
if err != nil {
return types.BlockSeal{}, fmt.Errorf("failed to determine Last Common L1: %w", err)
}
common = derivedFrom
// if the common block isn't yet set,
// or if the new common block is older than the current common block
// set the common block
if common == (types.BlockSeal{}) ||
derivedFrom.Number < common.Number {
common = derivedFrom
}
}
return common, nil
}

func (db *ChainsDB) IsCrossUnsafe(chainID types.ChainID, block eth.BlockID) error {
v, ok := db.crossUnsafe.Get(chainID)
if !ok {
Expand Down
38 changes: 38 additions & 0 deletions op-supervisor/supervisor/backend/db/update.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -86,3 +87,40 @@ func (db *ChainsDB) UpdateFinalizedL1(finalized eth.BlockRef) error {
db.logger.Info("Updated finalized L1", "finalizedL1", finalized)
return nil
}

// RecordNewL1 records a new L1 block in the database.
// it uses the latest derived L2 block as the derived block for the new L1 block.
func (db *ChainsDB) RecordNewL1(ref eth.BlockRef) error {
for _, chain := range db.depSet.Chains() {
// get local derivation database
ldb, ok := db.localDBs.Get(chain)
if !ok {
return fmt.Errorf("cannot RecordNewL1 to chain %s: %w", chain, types.ErrUnknownChain)
}
// get the latest derived and derivedFrom blocks
derivedFrom, derived, err := ldb.Latest()
if err != nil {
return fmt.Errorf("failed to get latest derivedFrom for chain %s: %w", chain, err)
}
// make a ref from the latest derived block
derivedParent, err := ldb.PreviousDerived(derived.ID())
if errors.Is(err, types.ErrFuture) {
db.logger.Warn("Empty DB, Recording first L1 block", "chain", chain, "err", err)
} else if err != nil {
db.logger.Warn("Failed to get latest derivedfrom to insert new L1 block", "chain", chain, "err", err)
return err
}
derivedRef := derived.MustWithParent(derivedParent.ID())
// don't push the new L1 block if it's not newer than the latest derived block
if derivedFrom.Number >= ref.Number {
db.logger.Warn("L1 block has already been processed for this height", "chain", chain, "block", ref, "latest", derivedFrom)
continue
}
// the database is extended with the new L1 and the existing L2
if err = db.UpdateLocalSafe(chain, ref, derivedRef); err != nil {
db.logger.Error("Failed to update local safe", "chain", chain, "block", ref, "derived", derived, "err", err)
return err
}
}
return nil
}
Loading