diff --git a/.github/workflows/docker-images.yml b/.github/workflows/docker-images.yml index 29c79b1aa42b1..e3140166c4cf3 100644 --- a/.github/workflows/docker-images.yml +++ b/.github/workflows/docker-images.yml @@ -2,17 +2,9 @@ name: Build and Push Docker Images on: push: - branches: [main, celo*] - paths: - - "espresso/docker/**" - - "espresso/docker-compose.yml" - - "config/**" + branches: + - "celo-integration*" pull_request: - branches: [main, celo*, integration] - paths: - - "espresso/docker/**" - - "espresso/docker-compose.yml" - - "config/**" workflow_dispatch: env: @@ -45,7 +37,7 @@ jobs: - name: Install Foundry uses: foundry-rs/foundry-toolchain@v1 with: - version: nightly-654c8f01721e43dbc8a53c7a3b022548cb82b2f9 # same as for the nix environment + version: nightly-654c8f01721e43dbc8a53c7a3b022548cb82b2f9 # same as for the nix environment - name: Install dasel run: | @@ -465,4 +457,3 @@ jobs: TARGET_BASE_IMAGE=alpine:3.22 TARGETOS=linux TARGETARCH=amd64 - diff --git a/.github/workflows/espresso-devnet-tests.yaml b/.github/workflows/espresso-devnet-tests.yaml index e8c8acc7262a1..8cbf2980e2d81 100644 --- a/.github/workflows/espresso-devnet-tests.yaml +++ b/.github/workflows/espresso-devnet-tests.yaml @@ -1,11 +1,8 @@ name: Run Espresso Devnet tests on: pull_request: - branches: - - "celo-integration*" push: branches: - - "master" - "celo-integration*" workflow_dispatch: @@ -57,7 +54,6 @@ jobs: - name: Run Challenge Game test run: go test -timeout 30m -p 1 -count 1 -run 'TestChallengeGame' -v ./espresso/devnet-tests/... - - name: Run Withdraw test run: go test -timeout 30m -p 1 -count 1 -run 'TestWithdrawal' -v ./espresso/devnet-tests/... diff --git a/.github/workflows/espresso-enclave.yaml b/.github/workflows/espresso-enclave.yaml index 163ffac245b1a..16b1cfd51bc7a 100644 --- a/.github/workflows/espresso-enclave.yaml +++ b/.github/workflows/espresso-enclave.yaml @@ -2,8 +2,6 @@ name: Run enclave tests on EC2 instance on: pull_request: - branches: - - "celo-integration*" push: branches: - "celo-integration*" @@ -19,7 +17,6 @@ jobs: timeout-minutes: 40 steps: - - name: Checkout repository uses: actions/checkout@v4 diff --git a/.github/workflows/espresso-integration.yaml b/.github/workflows/espresso-integration.yaml index 7c38e5f01b66d..9f33765b8735f 100644 --- a/.github/workflows/espresso-integration.yaml +++ b/.github/workflows/espresso-integration.yaml @@ -1,11 +1,8 @@ name: Run Espresso integration tests on: pull_request: - branches: - - "celo-integration*" push: branches: - - "master" - "celo-integration*" workflow_dispatch: diff --git a/espresso/cli.go b/espresso/cli.go index 89066fa728dc4..1fafa1f5abc58 100644 --- a/espresso/cli.go +++ b/espresso/cli.go @@ -8,7 +8,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli/v2" + + espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" + espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" ) // espressoFlags returns the flag names for espresso @@ -28,6 +33,9 @@ var ( LightClientAddrFlagName = espressoFlags("light-client-addr") L1UrlFlagName = espressoFlags("l1-url") TestingBatcherPrivateKeyFlagName = espressoFlags("testing-batcher-private-key") + OriginHeight = espressoFlags("origin-height") + NamespaceFlagName = espressoFlags("namespace") + RollupL1UrlFlagName = espressoFlags("rollup-l1-url") ) func CLIFlags(envPrefix string, category string) []cli.Flag { @@ -77,6 +85,24 @@ func CLIFlags(envPrefix string, category string) []cli.Flag { EnvVars: espressoEnvs(envPrefix, "TESTING_BATCHER_PRIVATE_KEY"), Category: category, }, + &cli.Uint64Flag{ + Name: OriginHeight, + Usage: "Espresso transactions below this height will not be considered", + EnvVars: espressoEnvs(envPrefix, "ORIGIN_HEIGHT"), + Category: category, + }, + &cli.Uint64Flag{ + Name: NamespaceFlagName, + Usage: "Namespace of Espresso transactions", + EnvVars: espressoEnvs(envPrefix, "NAMESPACE"), + Category: category, + }, + &cli.StringFlag{ + Name: RollupL1UrlFlagName, + Usage: "RPC URL of L1 backing the Rollup we're streaming for", + EnvVars: espressoEnvs(envPrefix, "ROLLUP_L1_URL"), + Category: category, + }, } } @@ -87,7 +113,10 @@ type CLIConfig struct { QueryServiceURLs []string LightClientAddr common.Address L1URL string + RollupL1URL string TestingBatcherPrivateKey *ecdsa.PrivateKey + Namespace uint64 + OriginHeight uint64 } func (c CLIConfig) Check() error { @@ -102,6 +131,12 @@ func (c CLIConfig) Check() error { if c.L1URL == "" { return fmt.Errorf("L1 URL is required when Espresso is enabled") } + if c.RollupL1URL == "" { + return fmt.Errorf("rollup L1 URL is required when Espresso is enabled") + } + if c.Namespace == 0 { + return fmt.Errorf("namespace is required when Espresso is enabled") + } } return nil } @@ -112,6 +147,9 @@ func ReadCLIConfig(c *cli.Context) CLIConfig { PollInterval: c.Duration(PollIntervalFlagName), UseFetchAPI: c.Bool(UseFetchApiFlagName), L1URL: c.String(L1UrlFlagName), + RollupL1URL: c.String(RollupL1UrlFlagName), + Namespace: c.Uint64(NamespaceFlagName), + OriginHeight: c.Uint64(OriginHeight), } config.QueryServiceURLs = c.StringSlice(QueryServiceUrlsFlagName) @@ -128,3 +166,48 @@ func ReadCLIConfig(c *cli.Context) CLIConfig { return config } + +func BatchStreamerFromCLIConfig[B Batch]( + cfg CLIConfig, + log log.Logger, + unmarshalBatch func([]byte) (*B, error), +) (*BatchStreamer[B], error) { + if !cfg.Enabled { + return nil, fmt.Errorf("Espresso is not enabled") + } + + l1Client, err := ethclient.Dial(cfg.L1URL) + if err != nil { + return nil, fmt.Errorf("failed to dial L1 RPC at %s: %w", cfg.L1URL, err) + } + + RollupL1Client, err := ethclient.Dial(cfg.RollupL1URL) + if err != nil { + return nil, fmt.Errorf("failed to dial Rollup L1 RPC at %s: %w", cfg.RollupL1URL, err) + } + + espressoClient, err := espressoClient.NewMultipleNodesClient(cfg.QueryServiceURLs) + if err != nil { + return nil, fmt.Errorf("failed to create Espresso client: %w", err) + } + + espressoLightClient, err := espressoLightClient.NewLightclientCaller(cfg.LightClientAddr, l1Client) + if err != nil { + return nil, fmt.Errorf("failed to create Espresso light client") + } + + streamer := NewEspressoStreamer( + cfg.Namespace, + NewAdaptL1BlockRefClient(l1Client), + NewAdaptL1BlockRefClient(RollupL1Client), + espressoClient, + espressoLightClient, + log, + unmarshalBatch, + cfg.PollInterval, + cfg.OriginHeight, + ) + streamer.UseFetchApi = cfg.UseFetchAPI + + return streamer, nil +} diff --git a/espresso/environment/2_espresso_liveness_test.go b/espresso/environment/2_espresso_liveness_test.go index 1eeeba48fe7b0..6076392467454 100644 --- a/espresso/environment/2_espresso_liveness_test.go +++ b/espresso/environment/2_espresso_liveness_test.go @@ -13,7 +13,6 @@ import ( espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" "github.com/ethereum-optimism/optimism/espresso" env "github.com/ethereum-optimism/optimism/espresso/environment" - "github.com/ethereum-optimism/optimism/op-batcher/batcher" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" "github.com/ethereum-optimism/optimism/op-e2e/system/e2esys" "github.com/ethereum-optimism/optimism/op-e2e/system/helpers" @@ -262,7 +261,8 @@ func TestE2eDevnetWithEspressoDegradedLivenessViaCaffNode(t *testing.T) { require.NoError(t, err, "light client creation failed") streamer := espresso.NewEspressoStreamer( system.RollupConfig.L2ChainID.Uint64(), - batcher.NewAdaptL1BlockRefClient(l1Client), + espresso.NewAdaptL1BlockRefClient(l1Client), + espresso.NewAdaptL1BlockRefClient(l1Client), espressoClient.NewClient(server.URL), lightClient, l, @@ -270,6 +270,7 @@ func TestE2eDevnetWithEspressoDegradedLivenessViaCaffNode(t *testing.T) { return derive.UnmarshalEspressoTransaction(b, system.RollupConfig.Genesis.SystemConfig.BatcherAddr) }, 100*time.Millisecond, + 0, ) l1Client, _ := client.NewRPC(streamBlocksCtx, l, system.NodeEndpoint(e2esys.RoleL1).RPC()) diff --git a/espresso/environment/enclave_helpers.go b/espresso/environment/enclave_helpers.go index 1ea7b80768de6..03abbde8b0190 100644 --- a/espresso/environment/enclave_helpers.go +++ b/espresso/environment/enclave_helpers.go @@ -96,6 +96,7 @@ func LaunchBatcherInEnclave() E2eDevnetLauncherOption { appendArg(&args, flags.L1EthRpcFlag.Name, l1Rpc) appendArg(&args, txmgr.L1RPCFlagName, l1Rpc) appendArg(&args, espresso.L1UrlFlagName, l1Rpc) + appendArg(&args, espresso.RollupL1UrlFlagName, l1Rpc) l2EthRpc := sys.EthInstances[e2esys.RoleSeq].UserRPC().(endpoint.HttpRPC).HttpRPC() appendArg(&args, flags.L2EthRpcFlag.Name, l2EthRpc) rollupRpc := sys.RollupNodes[e2esys.RoleSeq].UserRPC().(endpoint.HttpRPC).HttpRPC() diff --git a/espresso/environment/espresso_caff_node.go b/espresso/environment/espresso_caff_node.go index 8ec03f2847c75..c84dd15cb0344 100644 --- a/espresso/environment/espresso_caff_node.go +++ b/espresso/environment/espresso_caff_node.go @@ -118,6 +118,7 @@ func LaunchCaffNode(t *testing.T, system *e2esys.System, espressoDevNode Espress // To create a valid multiple nodes client, we need to provide at least 2 URLs. QueryServiceURLs: []string{u.String(), u.String()}, L1URL: system.L1.UserRPC().RPC(), + RollupL1URL: system.L1.UserRPC().RPC(), LightClientAddr: common.HexToAddress(ESPRESSO_LIGHT_CLIENT_ADDRESS), } diff --git a/espresso/ethclient.go b/espresso/ethclient.go new file mode 100644 index 0000000000000..b4db1d3615d98 --- /dev/null +++ b/espresso/ethclient.go @@ -0,0 +1,31 @@ +package espresso + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// AdaptL1BlockRefClient is a wrapper around eth.L1BlockRef that implements the espresso.L1Client interface +type AdaptL1BlockRefClient struct { + L1Client *ethclient.Client +} + +// NewAdaptL1BlockRefClient creates a new L1BlockRefClient +func NewAdaptL1BlockRefClient(L1Client *ethclient.Client) *AdaptL1BlockRefClient { + return &AdaptL1BlockRefClient{ + L1Client: L1Client, + } +} + +// HeaderHashByNumber implements the espresso.L1Client interface +func (c *AdaptL1BlockRefClient) HeaderHashByNumber(ctx context.Context, number *big.Int) (common.Hash, error) { + expectedL1BlockRef, err := c.L1Client.HeaderByNumber(ctx, number) + if err != nil { + return common.Hash{}, err + } + + return expectedL1BlockRef.Hash(), nil +} diff --git a/espresso/streamer.go b/espresso/streamer.go index d73e611eddaf4..d2ce68f01bdd4 100644 --- a/espresso/streamer.go +++ b/espresso/streamer.go @@ -74,6 +74,7 @@ type BatchStreamer[B Batch] struct { Namespace uint64 L1Client L1Client + RollupL1Client L1Client EspressoClient EspressoClient EspressoLightClient LightClientCallerInterface Log log.Logger @@ -87,6 +88,8 @@ type BatchStreamer[B Batch] struct { fallbackBatchPos uint64 // HotShot position that we can fallback to, guaranteeing not to skip any unsafe batches fallbackHotShotPos uint64 + // HotShot position we start reading from, exclusive + originHotShotPos uint64 // Latest finalized block on the L1. FinalizedL1 eth.L1BlockRef @@ -110,14 +113,17 @@ var _ EspressoStreamer[Batch] = (*BatchStreamer[Batch])(nil) func NewEspressoStreamer[B Batch]( namespace uint64, l1Client L1Client, + rollupL1Client L1Client, espressoClient EspressoClient, lightClient LightClientCallerInterface, log log.Logger, unmarshalBatch func([]byte) (*B, error), pollingHotShotPollingInterval time.Duration, + originHotShotPos uint64, ) *BatchStreamer[B] { return &BatchStreamer[B]{ L1Client: l1Client, + RollupL1Client: rollupL1Client, EspressoClient: espressoClient, EspressoLightClient: lightClient, Log: log, @@ -127,6 +133,9 @@ func NewEspressoStreamer[B Batch]( PollingHotShotPollingInterval: pollingHotShotPollingInterval, RemainingBatches: make(map[common.Hash]B), unmarshalBatch: unmarshalBatch, + originHotShotPos: originHotShotPos, + fallbackHotShotPos: originHotShotPos, + hotShotPos: originHotShotPos, } } @@ -147,7 +156,10 @@ func (s *BatchStreamer[B]) RefreshSafeL1Origin(safeL1Origin eth.BlockID) error { s.Reset() } - return err + if err != nil { + return fmt.Errorf("failed to confirm espresso block height: %w", err) + } + return nil } // Update streamer state based on L1 and L2 sync status @@ -155,7 +167,7 @@ func (s *BatchStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockR s.FinalizedL1 = finalizedL1 if err := s.RefreshSafeL1Origin(safeL1Origin); err != nil { - return err + return fmt.Errorf("failed to refresh safe L1 origin: %w", err) } // NOTE: be sure to update s.finalizedL1 before checking this condition and returning @@ -187,7 +199,7 @@ func (s *BatchStreamer[B]) CheckBatch(ctx context.Context, batch B) (BatchValidi return BatchUndecided, 0 } - l1headerHash, err := s.L1Client.HeaderHashByNumber(ctx, new(big.Int).SetUint64(origin.Number)) + l1headerHash, err := s.RollupL1Client.HeaderHashByNumber(ctx, new(big.Int).SetUint64(origin.Number)) if err != nil { // Signal to resync to be able to fetch the L1 header. s.Log.Warn("Failed to fetch the L1 header, pending resync", "error", err) @@ -263,7 +275,7 @@ func (s *BatchStreamer[B]) Update(ctx context.Context) error { // the current block height available to process. currentBlockHeight, err := s.EspressoClient.FetchLatestBlockHeight(ctx) if err != nil { - return err + return fmt.Errorf("failed to fetch latest block height: %w", err) } // Streaming API implementation @@ -544,10 +556,10 @@ func (s *BatchStreamer[B]) confirmEspressoBlockHeight(safeL1Origin eth.BlockID) hotshotState, err := s.EspressoLightClient. FinalizedState(&bind.CallOpts{BlockNumber: new(big.Int).SetUint64(safeL1Origin.Number)}) if errors.Is(err, bind.ErrNoCode) { - s.fallbackHotShotPos = 0 + s.fallbackHotShotPos = s.originHotShotPos return false, nil } else if err != nil { - return false, err + return false, fmt.Errorf("failed to get finalized state from light client: %w", err) } shouldReset = hotshotState.BlockHeight < s.fallbackHotShotPos diff --git a/espresso/streamer_test.go b/espresso/streamer_test.go index ba3c3a0574986..81d55602309ef 100644 --- a/espresso/streamer_test.go +++ b/espresso/streamer_test.go @@ -35,8 +35,10 @@ func TestNewEspressoStreamer(t *testing.T) { _ = espresso.NewEspressoStreamer( 1, nil, + nil, nil, nil, nil, derive.CreateEspressoBatchUnmarshaler(common.Address{}), 50*time.Millisecond, + 0, ) } @@ -355,9 +357,11 @@ func setupStreamerTesting(namespace uint64, batcherAddress common.Address) (*Moc state, state, state, + state, logger, derive.CreateEspressoBatchUnmarshaler(batcherAddress), 50*time.Millisecond, + 0, ) return state, streamer diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 2683ad213619f..963941a3ba885 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/urfave/cli/v2" @@ -187,14 +186,6 @@ func (c *CLIConfig) Check() error { return err } - if c.Espresso.L1URL == "" { - log.Warn("Espresso L1 URL not provided, using L1EthRpc") - c.Espresso.L1URL = c.L1EthRpc - } - if err := c.Espresso.Check(); err != nil { - return err - } - return nil } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 4d52812b15bba..bca560b380fd8 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -135,6 +135,9 @@ type BatchSubmitter struct { prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus espressoSubmitter *espressoTransactionSubmitter + // Group to limit number of concurrent batches waiting for approval + // from BatchAuthenticator contract, only relevant when running with Espresso enabled + teeAuthGroup errgroup.Group } // NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup @@ -210,6 +213,12 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { l.espressoSubmitter.SpawnWorkers(4, 4) l.espressoSubmitter.Start() + // Limit teeAuthGroup to at most 128 concurrent goroutines as an arbitrary + // not-too-big limit for the number of BatchInbox transactions that can be + // simultaneously waiting for corresponding BatchAuthenticator transaction to be + // confirmed before submission to L1. + l.teeAuthGroup.SetLimit(128) + l.wg.Add(4) go l.receiptsLoop(l.wg, receiptsCh) // ranges over receiptsCh channel go l.espressoBatchQueueingLoop(l.shutdownCtx, l.wg) @@ -523,6 +532,14 @@ func (l *BatchSubmitter) publishingLoop(ctx context.Context, wg *sync.WaitGroup, } } + // Wait for all transactions requiring TEE authentication to complete to prevent new + // transactions being queued + if err := l.teeAuthGroup.Wait(); err != nil { + if !errors.Is(err, context.Canceled) { + l.Log.Error("error waiting for transaction authentication requests to complete", "err", err) + } + } + // We _must_ wait for all senders on receiptsCh to finish before we can close it. if err := txQueue.Wait(); err != nil { if !errors.Is(err, context.Canceled) { @@ -908,7 +925,7 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh panic(err) // this error should not happen } l.Log.Warn("sending a cancellation transaction to unblock txpool", "blocked_blob", isBlockedBlob) - l.sendTx(txData{}, true, candidate, queue, receiptsCh, nil) + l.sendTx(txData{}, true, candidate, queue, receiptsCh) } // publishToAltDAAndStoreCommitment posts the txdata to the DA Provider and stores the returned commitment @@ -992,7 +1009,7 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef if candidate == nil { l.Log.Crit("txcandidate should have been set by one of the three branches above.") } - l.sendTx(txdata, false, candidate, queue, receiptsCh, daGroup) + l.sendTx(txdata, false, candidate, queue, receiptsCh) return nil } @@ -1002,18 +1019,14 @@ type TxSender[T any] interface { // sendTx uses the txmgr queue to send the given transaction candidate after setting its // gaslimit. It will block if the txmgr queue has reached its MaxPendingTransactions limit. -func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue TxSender[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) { +func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.TxCandidate, queue TxSender[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) { if l.Config.UseEspresso && !isCancel { - goroutineSpawned := daGroup.TryGo( + l.teeAuthGroup.Go( func() error { l.sendEspressoTx(txdata, isCancel, candidate, queue, receiptsCh) return nil }, ) - if !goroutineSpawned { - log.Warn("failed to spawn Espresso tx goroutine") - l.recordFailedDARequest(txdata.ID(), nil) - } return } floorDataGas, err := core.FloorDataGas(candidate.TxData) diff --git a/op-batcher/batcher/driver_test.go b/op-batcher/batcher/driver_test.go index c22ef9fa37ceb..1663ecc588b59 100644 --- a/op-batcher/batcher/driver_test.go +++ b/op-batcher/batcher/driver_test.go @@ -168,8 +168,7 @@ func TestBatchSubmitter_sendTx_FloorDataGas(t *testing.T) { false, &candidate, q, - make(chan txmgr.TxReceipt[txRef]), - nil) + make(chan txmgr.TxReceipt[txRef])) candidateOut := q.Load(txData.ID().String()) diff --git a/op-batcher/batcher/espresso.go b/op-batcher/batcher/espresso.go index ee4e2052b6ae3..a42f73813f8cf 100644 --- a/op-batcher/batcher/espresso.go +++ b/op-batcher/batcher/espresso.go @@ -702,28 +702,6 @@ func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStat } } -// AdaptL1BlockRefClient is a wrapper around eth.L1BlockRef that implements the espresso.L1Client interface -type AdaptL1BlockRefClient struct { - L1Client L1Client -} - -// NewAdaptL1BlockRefClient creates a new L1BlockRefClient -func NewAdaptL1BlockRefClient(L1Client L1Client) *AdaptL1BlockRefClient { - return &AdaptL1BlockRefClient{ - L1Client: L1Client, - } -} - -// HeaderHashByNumber implements the espresso.L1Client interface -func (c *AdaptL1BlockRefClient) HeaderHashByNumber(ctx context.Context, number *big.Int) (common.Hash, error) { - expectedL1BlockRef, err := c.L1Client.HeaderByNumber(ctx, number) - if err != nil { - return common.Hash{}, err - } - - return expectedL1BlockRef.Hash(), nil -} - // Periodically refreshes the sync status and polls Espresso streamer for new batches func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.WaitGroup, publishSignal chan struct{}) { l.Log.Info("Starting EspressoBatchLoadingLoop", "polling interval", l.Config.EspressoPollInterval) diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index e4d7439ccae5d..210a9c2a0819c 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -10,7 +10,6 @@ import ( "time" espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" - espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" "github.com/ethereum-optimism/optimism/espresso" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" "github.com/ethereum/go-ethereum/ethclient" @@ -93,10 +92,9 @@ type BatcherService struct { NotSubmittingOnStart bool opcrypto.ChainSigner - EspressoStreamer espresso.EspressoStreamer[derive.EspressoBatch] - EspressoClient espressoClient.EspressoClient - EspressoLightClient *espressoLightClient.LightclientCaller - Attestation *nitrite.Result + EspressoStreamer espresso.EspressoStreamer[derive.EspressoBatch] + EspressoClient espressoClient.EspressoClient + Attestation *nitrite.Result } type DriverSetupOption func(setup *DriverSetup) @@ -543,37 +541,46 @@ func (bs *BatcherService) initEspresso(cfg *CLIConfig) error { return nil } + if cfg.Espresso.RollupL1URL == "" { + cfg.Espresso.RollupL1URL = cfg.L1EthRpc + } + + if cfg.Espresso.RollupL1URL != cfg.L1EthRpc { + log.Warn("Espresso Rollup L1 URL differs from batcher's L1EthRpc") + } + + if cfg.Espresso.L1URL == "" { + log.Warn("Espresso L1 URL not provided, using batcher's L1EthRpc") + cfg.Espresso.L1URL = cfg.L1EthRpc + } + if cfg.Espresso.Namespace == 0 { + log.Info("Using L2 chain ID as namespace by default") + cfg.Espresso.Namespace = bs.RollupConfig.L2ChainID.Uint64() + } + + if err := cfg.Espresso.Check(); err != nil { + return fmt.Errorf("invalid Espresso config: %w", err) + } + bs.UseEspresso = true bs.EspressoPollInterval = cfg.Espresso.PollInterval - client, err := espressoClient.NewMultipleNodesClient(cfg.Espresso.QueryServiceURLs) + espressoClient, err := espressoClient.NewMultipleNodesClient(cfg.Espresso.QueryServiceURLs) if err != nil { return fmt.Errorf("failed to create Espresso client: %w", err) } - bs.EspressoClient = client - - espressoLightClient, err := espressoLightClient.NewLightclientCaller(cfg.Espresso.LightClientAddr, bs.L1Client) - if err != nil { - return fmt.Errorf("failed to create Espresso light client") - } - bs.EspressoLightClient = espressoLightClient + bs.EspressoClient = espressoClient if err := bs.initKeyPair(); err != nil { return fmt.Errorf("failed to create key pair for batcher: %w", err) } - unbufferedStreamer := espresso.NewEspressoStreamer( - bs.RollupConfig.L2ChainID.Uint64(), - NewAdaptL1BlockRefClient(bs.L1Client), - client, - bs.EspressoLightClient, - bs.Log, - func(data []byte) (*derive.EspressoBatch, error) { - return derive.UnmarshalEspressoTransaction(data, bs.TxManager.From()) - }, - 2*time.Second, - ) - unbufferedStreamer.UseFetchApi = cfg.Espresso.UseFetchAPI + unbufferedStreamer, err := espresso.BatchStreamerFromCLIConfig(cfg.Espresso, bs.Log, func(data []byte) (*derive.EspressoBatch, error) { + return derive.UnmarshalEspressoTransaction(data, bs.TxManager.From()) + }) + if err != nil { + return fmt.Errorf("failed to create unbuffered Espresso streamer: %w", err) + } // We wrap the streamer in a BufferedStreamer to reduce impact of streamer resets bs.EspressoStreamer = espresso.NewBufferedEspressoStreamer(unbufferedStreamer) diff --git a/op-batcher/enclave-entrypoint.bash b/op-batcher/enclave-entrypoint.bash index 4684d00b36253..cd06e79ea096a 100644 --- a/op-batcher/enclave-entrypoint.bash +++ b/op-batcher/enclave-entrypoint.bash @@ -6,7 +6,7 @@ # to directly pass commandline arguments when starting EIF images) # We will need to start a proxy for each of those urls -URL_ARG_RE='^(--altda\.da-server|--espresso\.urls|--espresso.\l1-url|--l1-eth-rpc|--l2-eth-rpc|--rollup-rpc|--signer\.endpoint)(=|$)' +URL_ARG_RE='^(--altda\.da-server|--espresso\.urls|--espresso.\l1-url|--espresso.rollup-l1-url|--l1-eth-rpc|--l2-eth-rpc|--rollup-rpc|--signer\.endpoint)(=|$)' # Re-populate the arguments passed through the environment if [ -n "$ENCLAVE_BATCHER_ARGS" ]; then diff --git a/op-e2e/system/e2esys/setup.go b/op-e2e/system/e2esys/setup.go index 4e2aef04b46ae..b99b6ddbffbbc 100644 --- a/op-e2e/system/e2esys/setup.go +++ b/op-e2e/system/e2esys/setup.go @@ -1024,6 +1024,7 @@ func (cfg SystemConfig) Start(t *testing.T, startOpts ...StartOption) (*System, Enabled: (cfg.AllocType == config.AllocTypeEspressoWithEnclave) || (cfg.AllocType == config.AllocTypeEspressoWithoutEnclave), PollInterval: 250 * time.Millisecond, L1URL: sys.EthInstances[RoleL1].UserRPC().RPC(), + RollupL1URL: sys.EthInstances[RoleL1].UserRPC().RPC(), TestingBatcherPrivateKey: testingBatcherPk, } diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index 33b8bd16b41eb..103e4a7348c8b 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -9,11 +9,8 @@ import ( "github.com/ethereum-optimism/optimism/espresso" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" - espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" - espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/eth" ) @@ -74,60 +71,37 @@ type SingularBatchProvider interface { NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error) } -func initEspressoStreamer(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher) *espresso.BatchStreamer[EspressoBatch] { - +func initEspressoStreamer(log log.Logger, cfg *rollup.Config) *espresso.BatchStreamer[EspressoBatch] { if !cfg.CaffNodeConfig.Enabled { log.Info("Espresso streamer not initialized: Caff node is not enabled") return nil } - // Create an adapter that implements espresso.L1Client - l1BlockRefClient := NewL1BlockRefClient(l1Fetcher) - - l1Client, err := ethclient.Dial(cfg.CaffNodeConfig.L1URL) - if err != nil { - log.Error("Espresso streamer not initialized: Failed to connect to L1", "err", err) - return nil + if cfg.CaffNodeConfig.Namespace == 0 { + log.Info("Using L2 chain ID as namespace by default") + cfg.CaffNodeConfig.Namespace = cfg.L2ChainID.Uint64() } - lightClient, err := espressoLightClient.NewLightclientCaller(cfg.CaffNodeConfig.LightClientAddr, l1Client) + streamer, err := espresso.BatchStreamerFromCLIConfig(cfg.CaffNodeConfig, log, func(data []byte) (*EspressoBatch, error) { + return UnmarshalEspressoTransaction(data, cfg.Genesis.SystemConfig.BatcherAddr) + }) if err != nil { - log.Error("Espresso streamer not initialized: Failed to connect to light client", "err", err) + log.Error("Failed to initialize Espresso streamer", "err", err) return nil } - client, err := espressoClient.NewMultipleNodesClient(cfg.CaffNodeConfig.QueryServiceURLs) - if err != nil { - log.Error("Espresso streamer not initialized: Failed to connect to hotshot client", "err", err) - return nil - } - streamer := espresso.NewEspressoStreamer( - cfg.L2ChainID.Uint64(), - l1BlockRefClient, - client, - lightClient, - log, - func(data []byte) (*EspressoBatch, error) { - return UnmarshalEspressoTransaction(data, cfg.Genesis.SystemConfig.BatcherAddr) - }, - cfg.CaffNodeConfig.PollInterval, - ) - streamer.UseFetchApi = cfg.CaffNodeConfig.UseFetchAPI - - log.Debug("Espresso Streamer namespace:", streamer.Namespace) - - log.Info("Espresso streamer initialized", "namespace", cfg.L2ChainID.Uint64(), "polling hotshot polling interval", cfg.CaffNodeConfig.PollInterval, "hotshot urls", cfg.CaffNodeConfig.QueryServiceURLs) + log.Info("Espresso streamer initialized", "namespace", streamer.Namespace, "hotshot polling interval", cfg.CaffNodeConfig.PollInterval, "hotshot urls", cfg.CaffNodeConfig.QueryServiceURLs) return streamer } -func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev SingularBatchProvider, l1Fetcher L1Fetcher) *AttributesQueue { +func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev SingularBatchProvider) *AttributesQueue { return &AttributesQueue{ log: log, config: cfg, builder: builder, prev: prev, isCaffNode: cfg.CaffNodeConfig.Enabled, - espressoStreamer: initEspressoStreamer(log, cfg, l1Fetcher), + espressoStreamer: initEspressoStreamer(log, cfg), } } @@ -147,12 +121,11 @@ func CaffNextBatch(s *espresso.BatchStreamer[EspressoBatch], ctx context.Context // Get the L1 finalized block finalizedL1Block, err := l1Fetcher.L1BlockRefByLabel(ctx, eth.Finalized) if err != nil { - s.Log.Error("failed to get the L1 finalized block", "err", err) - return nil, false, err + return nil, false, fmt.Errorf("failed to get the L1 finalized block: %w", err) } // Refresh the sync status if err := s.Refresh(ctx, finalizedL1Block, parent.Number, parent.L1Origin); err != nil { - return nil, false, err + return nil, false, fmt.Errorf("failed to refresh Espresso streamer: %w", err) } // Update the streamer if needed diff --git a/op-node/rollup/derive/attributes_queue_test.go b/op-node/rollup/derive/attributes_queue_test.go index afa2c66952d7d..2f6e59141e4c7 100644 --- a/op-node/rollup/derive/attributes_queue_test.go +++ b/op-node/rollup/derive/attributes_queue_test.go @@ -79,7 +79,7 @@ func TestAttributesQueue(t *testing.T) { } attrBuilder := NewFetchingAttributesBuilder(cfg, nil, l1Fetcher, l2Fetcher) - aq := NewAttributesQueue(testlog.Logger(t, log.LevelError), cfg, attrBuilder, nil, l1Fetcher) + aq := NewAttributesQueue(testlog.Logger(t, log.LevelError), cfg, attrBuilder, nil) actual, err := aq.createNextAttributes(context.Background(), &batch, safeHead) diff --git a/op-node/rollup/derive/espresso_caff_l1_block_ref_client.go b/op-node/rollup/derive/espresso_caff_l1_block_ref_client.go deleted file mode 100644 index 415ae7b8c8923..0000000000000 --- a/op-node/rollup/derive/espresso_caff_l1_block_ref_client.go +++ /dev/null @@ -1,30 +0,0 @@ -package derive - -import ( - "context" - "math/big" - - "github.com/ethereum/go-ethereum/common" -) - -// L1BlockRefClient is a wrapper around eth.L1BlockRef that implements the espresso.L1Client interface -type L1BlockRefClient struct { - L1Fetcher L1Fetcher -} - -// NewL1BlockRefClient creates a new L1BlockRefClient -func NewL1BlockRefClient(L1Fetcher L1Fetcher) *L1BlockRefClient { - return &L1BlockRefClient{ - L1Fetcher: L1Fetcher, - } -} - -// HeaderHashByNumber implements the espresso.L1Client interface -func (c *L1BlockRefClient) HeaderHashByNumber(ctx context.Context, number *big.Int) (common.Hash, error) { - expectedL1BlockRef, err := c.L1Fetcher.L1BlockRefByNumber(ctx, number.Uint64()) - if err != nil { - return common.Hash{}, err - } - - return expectedL1BlockRef.Hash, nil -} diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 83e7951714d51..2991a4aef2ccb 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -117,7 +117,7 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, depSet Depe chInReader := NewChannelInReader(rollupCfg, log, channelMux, metrics) batchMux := NewBatchMux(log, rollupCfg, chInReader, l2Source) attrBuilder := NewFetchingAttributesBuilder(rollupCfg, depSet, l1Fetcher, l2Source) - attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchMux, l1Fetcher) + attributesQueue := NewAttributesQueue(log, rollupCfg, attrBuilder, batchMux) // Reset from ResetEngine then up from L1 Traversal. The stages do not talk to each other during // the ResetEngine, but after the ResetEngine, this is the order in which the stages could talk to each other. diff --git a/op-node/service.go b/op-node/service.go index 8cb9c881fd2c1..29eafb315b472 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -69,6 +69,14 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*config.Config, error) { l1Endpoint := NewL1EndpointConfig(ctx) + if rollupConfig.CaffNodeConfig.RollupL1URL == "" { + rollupConfig.CaffNodeConfig.RollupL1URL = l1Endpoint.L1NodeAddr + } + + if l1Endpoint.L1NodeAddr != rollupConfig.CaffNodeConfig.RollupL1URL { + log.Warn("Espresso streamer rollup L1 URL does not match L1 node address of caff node", "rollupL1URL", rollupConfig.CaffNodeConfig.RollupL1URL, "l1NodeAddr", l1Endpoint.L1NodeAddr) + } + l2Endpoint, err := NewL2EndpointConfig(ctx, log) if err != nil { return nil, fmt.Errorf("failed to load l2 endpoints info: %w", err)