diff --git a/espresso/docker-compose.yml b/espresso/docker-compose.yml index 99ab55e1092..0c4cab4ac0d 100644 --- a/espresso/docker-compose.yml +++ b/espresso/docker-compose.yml @@ -302,6 +302,7 @@ services: - --rpc.addr=0.0.0.0 - --sequencer.enabled=false - --espresso.enabled=true + - --espresso.fetch-api=true - --verifier.l1-confs=0 - --rollup.load-protocol-versions=false - --rollup.halt=none @@ -344,6 +345,7 @@ services: command: - op-batcher - --espresso.enabled=true + - --espresso.fetch-api=true - --espresso.poll-interval=1s - --espresso.light-client-addr=0x703848f4c85f18e3acd8196c8ec91eb0b7bd0797 - --espresso.testing-batcher-private-key=${OP_TESTING_BATCHER_PRIVATE_KEY:-$OPERATOR_PRIVATE_KEY} diff --git a/espresso/docker/op-batcher-tee/run-enclave.sh b/espresso/docker/op-batcher-tee/run-enclave.sh index c1682e8c6f9..ccc98397114 100755 --- a/espresso/docker/op-batcher-tee/run-enclave.sh +++ b/espresso/docker/op-batcher-tee/run-enclave.sh @@ -46,6 +46,7 @@ BATCHER_ARGS="$BATCHER_ARGS,--hd-path=m/44'/60'/0'/0/0" BATCHER_ARGS="$BATCHER_ARGS,--throttle-threshold=0" BATCHER_ARGS="$BATCHER_ARGS,--max-channel-duration=1" BATCHER_ARGS="$BATCHER_ARGS,--target-num-frames=1" +BATCHER_ARGS="$BATCHER_ARGS,--espresso.fetch-api=true" BATCHER_ARGS="$BATCHER_ARGS,--espresso.light-client-addr=0x703848f4c85f18e3acd8196c8ec91eb0b7bd0797" # Add debug arguments if enabled diff --git a/espresso/environment/3_2_espresso_deterministic_state_test.go b/espresso/environment/3_2_espresso_deterministic_state_test.go index a9f7118972d..b9047768ba0 100644 --- a/espresso/environment/3_2_espresso_deterministic_state_test.go +++ b/espresso/environment/3_2_espresso_deterministic_state_test.go @@ -345,7 +345,7 @@ func TestValidEspressoTransactionCreation(t *testing.T) { // Make sure the transaction will go through to op node by checking it will go through batch submitter's streamer batchSubmitter := system.BatchSubmitter - _, err = batchSubmitter.EspressoStreamer().UnmarshalBatch(realEspressoTransaction.Payload) + _, err = batchSubmitter.EspressoStreamer.UnmarshalBatch(realEspressoTransaction.Payload) if have, want := err, error(nil); have != want { t.Fatalf("Failed to unmarshal batch:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 7f50cd45275..0d31a8449bf 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" - espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" "github.com/ethereum-optimism/optimism/espresso" altda "github.com/ethereum-optimism/optimism/op-alt-da" "github.com/ethereum-optimism/optimism/op-batcher/metrics" @@ -106,11 +105,10 @@ type DriverSetup struct { ChannelOutFactory ChannelOutFactory ActiveSeqChanged chan struct{} // optional - Espresso *espressoClient.MultipleNodesClient - EspressoLightClient *espressoLightClient.LightclientCaller - ChainSigner opcrypto.ChainSigner - SequencerAddress common.Address - Attestation *nitrite.Result + EspressoStreamer espresso.EspressoStreamer[derive.EspressoBatch] + EspressoClient espressoClient.EspressoClient + ChainSigner opcrypto.ChainSigner + Attestation *nitrite.Result } // BatchSubmitter encapsulates a service responsible for submitting L2 tx @@ -134,7 +132,6 @@ type BatchSubmitter struct { prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus espressoSubmitter *espressoTransactionSubmitter - espressoStreamer espresso.EspressoStreamer[derive.EspressoBatch] } // NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup @@ -149,22 +146,6 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { channelMgr: state, } - batchSubmitter.espressoStreamer = espresso.NewBufferedEspressoStreamer( - espresso.NewEspressoStreamer( - batchSubmitter.RollupConfig.L2ChainID.Uint64(), - NewAdaptL1BlockRefClient(batchSubmitter.L1Client), - batchSubmitter.Espresso, - batchSubmitter.EspressoLightClient, - batchSubmitter.Log, - func(data []byte) (*derive.EspressoBatch, error) { - return derive.UnmarshalEspressoTransaction(data, batchSubmitter.SequencerAddress) - }, - 2*time.Second, - ), - ) - - log.Info("Streamer started", "streamer", batchSubmitter.espressoStreamer) - return batchSubmitter } @@ -221,7 +202,7 @@ func (l *BatchSubmitter) StartBatchSubmitting() error { l.espressoSubmitter = NewEspressoTransactionSubmitter( WithContext(l.shutdownCtx), WithWaitGroup(l.wg), - WithEspressoClient(l.Espresso), + WithEspressoClient(l.EspressoClient), ) l.espressoSubmitter.SpawnWorkers(4, 4) l.espressoSubmitter.Start() @@ -781,7 +762,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) { defer l.channelMgrMutex.Unlock() l.channelMgr.Clear(l1SafeOrigin) if l.Config.UseEspresso { - l.espressoStreamer.Reset() + l.EspressoStreamer.Reset() } return true } diff --git a/op-batcher/batcher/espresso.go b/op-batcher/batcher/espresso.go index 1b71a91cb97..d91f79f8ec6 100644 --- a/op-batcher/batcher/espresso.go +++ b/op-batcher/batcher/espresso.go @@ -19,8 +19,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum-optimism/optimism/espresso" - espressoLocal "github.com/ethereum-optimism/optimism/espresso" "github.com/ethereum-optimism/optimism/op-batcher/bindings" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/eth" @@ -650,10 +648,6 @@ func (s *espressoTransactionSubmitter) Start() { go s.handleVerifyReceiptJobResponse() } -func (bs *BatcherService) EspressoStreamer() espressoLocal.EspressoStreamer[derive.EspressoBatch] { - return bs.driver.espressoStreamer -} - func (bs *BatcherService) initKeyPair() error { key, err := crypto.GenerateKey() if err != nil { @@ -664,11 +658,6 @@ func (bs *BatcherService) initKeyPair() error { return nil } -// EspressoStreamer returns the batch submitter's Espresso streamer instance -func (l *BatchSubmitter) EspressoStreamer() espresso.EspressoStreamer[derive.EspressoBatch] { - return l.espressoStreamer -} - // Converts a block to an EspressoBatch and starts a goroutine that publishes it to Espresso // Returns error only if batch conversion fails, otherwise it is infallible, as the goroutine // will retry publishing until successful. @@ -691,7 +680,7 @@ func (l *BatchSubmitter) queueBlockToEspresso(ctx context.Context, block *types. } func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStatus *eth.SyncStatus) { - err := l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin) + err := l.EspressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin) if err != nil { l.Log.Warn("Failed to refresh Espresso streamer", "err", err) } @@ -706,7 +695,7 @@ func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStat l.prevCurrentL1 = newSyncStatus.CurrentL1 if syncActions.clearState != nil { l.channelMgr.Clear(*syncActions.clearState) - l.espressoStreamer.Reset() + l.EspressoStreamer.Reset() } else { l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune) l.channelMgr.PruneChannels(syncActions.channelsToPrune) @@ -755,13 +744,13 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync. l.espressoSyncAndRefresh(ctx, newSyncStatus) - err = l.espressoStreamer.Update(ctx) + err = l.EspressoStreamer.Update(ctx) var batch *derive.EspressoBatch for { - batch = l.espressoStreamer.Next(ctx) + batch = l.EspressoStreamer.Next(ctx) if batch == nil { break @@ -789,7 +778,7 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync. if err != nil { l.Log.Error("failed to add L2 block to channel manager", "err", err) l.clearState(ctx) - l.espressoStreamer.Reset() + l.EspressoStreamer.Reset() } l.Log.Info("Added L2 block to channel manager") diff --git a/op-batcher/batcher/service.go b/op-batcher/batcher/service.go index d3ae735c653..c297a02054b 100644 --- a/op-batcher/batcher/service.go +++ b/op-batcher/batcher/service.go @@ -12,6 +12,7 @@ import ( espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" + "github.com/ethereum-optimism/optimism/espresso" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" @@ -25,6 +26,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/params" "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/cliapp" "github.com/ethereum-optimism/optimism/op-service/dial" "github.com/ethereum-optimism/optimism/op-service/eth" @@ -92,7 +94,8 @@ type BatcherService struct { NotSubmittingOnStart bool opcrypto.ChainSigner - Espresso *espressoClient.MultipleNodesClient + EspressoStreamer espresso.EspressoStreamer[derive.EspressoBatch] + EspressoClient espressoClient.EspressoClient EspressoLightClient *espressoLightClient.LightclientCaller Attestation *nitrite.Result } @@ -136,53 +139,6 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, } opts = append(optsFromRPC, opts...) - if cfg.Espresso.Enabled { - bs.EspressoPollInterval = cfg.Espresso.PollInterval - client, err := espressoClient.NewMultipleNodesClient(cfg.Espresso.QueryServiceURLs) - if err != nil { - return fmt.Errorf("failed to create Espresso client: %w", err) - } - bs.Espresso = client - espressoLightClient, err := espressoLightClient.NewLightclientCaller(cfg.Espresso.LightClientAddr, bs.L1Client) - if err != nil { - return fmt.Errorf("failed to create Espresso light client") - } - bs.EspressoLightClient = espressoLightClient - bs.UseEspresso = true - if err := bs.initKeyPair(); err != nil { - return fmt.Errorf("failed to create key pair for batcher: %w", err) - } - - // try to generate attestationBytes on public key when start batcher - attestationBytes, err := enclave.AttestationWithPublicKey(bs.BatcherPublicKey) - if err != nil { - bs.Log.Info("Not running in enclave, skipping attestation", "info", err) - - // Replace ephemeral keys with configured keys, as in devnet they'll be pre-approved for batching - privateKey := cfg.Espresso.TestingBatcherPrivateKey - if privateKey == nil { - return fmt.Errorf("when not running in enclave, testing batcher private key should be set") - } - - publicKey := privateKey.Public() - publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) - if !ok { - return fmt.Errorf("error casting public key to ECDSA") - } - - bs.BatcherPrivateKey = privateKey - bs.BatcherPublicKey = publicKeyECDSA - } else { - // output length of attestation - bs.Log.Info("Successfully got attestation. Attestation length", "length", len(attestationBytes)) - result, err := nitrite.Verify(attestationBytes, nitrite.VerifyOptions{}) - if err != nil { - return fmt.Errorf("Couldn't verify attestation: %w", err) - } - bs.Attestation = result - } - } - if err := bs.initRollupConfig(ctx); err != nil { return fmt.Errorf("failed to load rollup config: %w", err) } @@ -203,6 +159,9 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string, if err := bs.initPProf(cfg); err != nil { return fmt.Errorf("failed to init profiling: %w", err) } + if err := bs.initEspresso(cfg); err != nil { + return fmt.Errorf("failed to init Espresso: %w", err) + } bs.initDriver(opts...) if err := bs.initRPCServer(cfg); err != nil { return fmt.Errorf("failed to start RPC server: %w", err) @@ -449,11 +408,10 @@ func (bs *BatcherService) initDriver(opts ...DriverSetupOption) { ChannelConfig: bs.ChannelConfig, AltDA: bs.AltDA, - SequencerAddress: bs.TxManager.From(), - ChainSigner: bs.ChainSigner, - Espresso: bs.Espresso, - EspressoLightClient: bs.EspressoLightClient, - Attestation: bs.Attestation, + EspressoStreamer: bs.EspressoStreamer, + EspressoClient: bs.EspressoClient, + ChainSigner: bs.ChainSigner, + Attestation: bs.Attestation, } for _, opt := range opts { opt(&ds) @@ -601,3 +559,75 @@ func (bs *BatcherService) HTTPEndpoint() string { } return "http://" + bs.rpcServer.Endpoint() } + +func (bs *BatcherService) initEspresso(cfg *CLIConfig) error { + if !cfg.Espresso.Enabled { + return nil + } + + bs.UseEspresso = true + bs.EspressoPollInterval = cfg.Espresso.PollInterval + + client, err := espressoClient.NewMultipleNodesClient(cfg.Espresso.QueryServiceURLs) + if err != nil { + return fmt.Errorf("failed to create Espresso client: %w", err) + } + bs.EspressoClient = client + + espressoLightClient, err := espressoLightClient.NewLightclientCaller(cfg.Espresso.LightClientAddr, bs.L1Client) + if err != nil { + return fmt.Errorf("failed to create Espresso light client") + } + bs.EspressoLightClient = espressoLightClient + + if err := bs.initKeyPair(); err != nil { + return fmt.Errorf("failed to create key pair for batcher: %w", err) + } + + unbufferedStreamer := espresso.NewEspressoStreamer( + bs.RollupConfig.L2ChainID.Uint64(), + NewAdaptL1BlockRefClient(bs.L1Client), + client, + bs.EspressoLightClient, + bs.Log, + func(data []byte) (*derive.EspressoBatch, error) { + return derive.UnmarshalEspressoTransaction(data, bs.TxManager.From()) + }, + 2*time.Second, + ) + unbufferedStreamer.UseFetchApi = cfg.Espresso.UseFetchAPI + + // We wrap the streamer in a BufferedStreamer to reduce impact of streamer resets + bs.EspressoStreamer = espresso.NewBufferedEspressoStreamer(unbufferedStreamer) + + // try to generate attestationBytes on public key when start batcher + attestationBytes, err := enclave.AttestationWithPublicKey(bs.BatcherPublicKey) + if err != nil { + bs.Log.Info("Not running in enclave, skipping attestation", "info", err) + + // Replace ephemeral keys with configured keys, as in devnet they'll be pre-approved for batching + privateKey := cfg.Espresso.TestingBatcherPrivateKey + if privateKey == nil { + return fmt.Errorf("when not running in enclave, testing batcher private key should be set") + } + + publicKey := privateKey.Public() + publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey) + if !ok { + return fmt.Errorf("error casting public key to ECDSA") + } + + bs.BatcherPrivateKey = privateKey + bs.BatcherPublicKey = publicKeyECDSA + } else { + // output length of attestation + bs.Log.Info("Successfully got attestation. Attestation length", "length", len(attestationBytes)) + result, err := nitrite.Verify(attestationBytes, nitrite.VerifyOptions{}) + if err != nil { + return fmt.Errorf("Couldn't verify attestation: %w", err) + } + bs.Attestation = result + } + + return nil +} diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index d88d2ea5c67..33b8bd16b41 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -112,6 +112,7 @@ func initEspressoStreamer(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetche }, cfg.CaffNodeConfig.PollInterval, ) + streamer.UseFetchApi = cfg.CaffNodeConfig.UseFetchAPI log.Debug("Espresso Streamer namespace:", streamer.Namespace)