Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions espresso/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ services:
- --rpc.addr=0.0.0.0
- --sequencer.enabled=false
- --espresso.enabled=true
- --espresso.fetch-api=true
- --verifier.l1-confs=0
- --rollup.load-protocol-versions=false
- --rollup.halt=none
Expand Down Expand Up @@ -344,6 +345,7 @@ services:
command:
- op-batcher
- --espresso.enabled=true
- --espresso.fetch-api=true
- --espresso.poll-interval=1s
- --espresso.light-client-addr=0x703848f4c85f18e3acd8196c8ec91eb0b7bd0797
- --espresso.testing-batcher-private-key=${OP_TESTING_BATCHER_PRIVATE_KEY:-$OPERATOR_PRIVATE_KEY}
Expand Down
1 change: 1 addition & 0 deletions espresso/docker/op-batcher-tee/run-enclave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ BATCHER_ARGS="$BATCHER_ARGS,--hd-path=m/44'/60'/0'/0/0"
BATCHER_ARGS="$BATCHER_ARGS,--throttle-threshold=0"
BATCHER_ARGS="$BATCHER_ARGS,--max-channel-duration=1"
BATCHER_ARGS="$BATCHER_ARGS,--target-num-frames=1"
BATCHER_ARGS="$BATCHER_ARGS,--espresso.fetch-api=true"
BATCHER_ARGS="$BATCHER_ARGS,--espresso.light-client-addr=0x703848f4c85f18e3acd8196c8ec91eb0b7bd0797"

# Add debug arguments if enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func TestValidEspressoTransactionCreation(t *testing.T) {

// Make sure the transaction will go through to op node by checking it will go through batch submitter's streamer
batchSubmitter := system.BatchSubmitter
_, err = batchSubmitter.EspressoStreamer().UnmarshalBatch(realEspressoTransaction.Payload)
_, err = batchSubmitter.EspressoStreamer.UnmarshalBatch(realEspressoTransaction.Payload)
if have, want := err, error(nil); have != want {
t.Fatalf("Failed to unmarshal batch:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want)
}
Expand Down
31 changes: 6 additions & 25 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"

espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client"
espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client"
"github.com/ethereum-optimism/optimism/espresso"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
Expand Down Expand Up @@ -106,11 +105,10 @@ type DriverSetup struct {
ChannelOutFactory ChannelOutFactory
ActiveSeqChanged chan struct{} // optional

Espresso *espressoClient.MultipleNodesClient
EspressoLightClient *espressoLightClient.LightclientCaller
ChainSigner opcrypto.ChainSigner
SequencerAddress common.Address
Attestation *nitrite.Result
EspressoStreamer espresso.EspressoStreamer[derive.EspressoBatch]
EspressoClient espressoClient.EspressoClient
ChainSigner opcrypto.ChainSigner
Attestation *nitrite.Result
}

// BatchSubmitter encapsulates a service responsible for submitting L2 tx
Expand All @@ -134,7 +132,6 @@ type BatchSubmitter struct {
prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus

espressoSubmitter *espressoTransactionSubmitter
espressoStreamer espresso.EspressoStreamer[derive.EspressoBatch]
}

// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
Expand All @@ -149,22 +146,6 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
channelMgr: state,
}

batchSubmitter.espressoStreamer = espresso.NewBufferedEspressoStreamer(
espresso.NewEspressoStreamer(
batchSubmitter.RollupConfig.L2ChainID.Uint64(),
NewAdaptL1BlockRefClient(batchSubmitter.L1Client),
batchSubmitter.Espresso,
batchSubmitter.EspressoLightClient,
batchSubmitter.Log,
func(data []byte) (*derive.EspressoBatch, error) {
return derive.UnmarshalEspressoTransaction(data, batchSubmitter.SequencerAddress)
},
2*time.Second,
),
)

log.Info("Streamer started", "streamer", batchSubmitter.espressoStreamer)

return batchSubmitter
}

Expand Down Expand Up @@ -221,7 +202,7 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
l.espressoSubmitter = NewEspressoTransactionSubmitter(
WithContext(l.shutdownCtx),
WithWaitGroup(l.wg),
WithEspressoClient(l.Espresso),
WithEspressoClient(l.EspressoClient),
)
l.espressoSubmitter.SpawnWorkers(4, 4)
l.espressoSubmitter.Start()
Expand Down Expand Up @@ -781,7 +762,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
defer l.channelMgrMutex.Unlock()
l.channelMgr.Clear(l1SafeOrigin)
if l.Config.UseEspresso {
l.espressoStreamer.Reset()
l.EspressoStreamer.Reset()
}
return true
}
Expand Down
21 changes: 5 additions & 16 deletions op-batcher/batcher/espresso.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/espresso"
espressoLocal "github.com/ethereum-optimism/optimism/espresso"
"github.com/ethereum-optimism/optimism/op-batcher/bindings"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -650,10 +648,6 @@ func (s *espressoTransactionSubmitter) Start() {
go s.handleVerifyReceiptJobResponse()
}

func (bs *BatcherService) EspressoStreamer() espressoLocal.EspressoStreamer[derive.EspressoBatch] {
return bs.driver.espressoStreamer
}

func (bs *BatcherService) initKeyPair() error {
key, err := crypto.GenerateKey()
if err != nil {
Expand All @@ -664,11 +658,6 @@ func (bs *BatcherService) initKeyPair() error {
return nil
}

// EspressoStreamer returns the batch submitter's Espresso streamer instance
func (l *BatchSubmitter) EspressoStreamer() espresso.EspressoStreamer[derive.EspressoBatch] {
return l.espressoStreamer
}

// Converts a block to an EspressoBatch and starts a goroutine that publishes it to Espresso
// Returns error only if batch conversion fails, otherwise it is infallible, as the goroutine
// will retry publishing until successful.
Expand All @@ -691,7 +680,7 @@ func (l *BatchSubmitter) queueBlockToEspresso(ctx context.Context, block *types.
}

func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStatus *eth.SyncStatus) {
err := l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin)
err := l.EspressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin)
if err != nil {
l.Log.Warn("Failed to refresh Espresso streamer", "err", err)
}
Expand All @@ -706,7 +695,7 @@ func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStat
l.prevCurrentL1 = newSyncStatus.CurrentL1
if syncActions.clearState != nil {
l.channelMgr.Clear(*syncActions.clearState)
l.espressoStreamer.Reset()
l.EspressoStreamer.Reset()
} else {
l.channelMgr.PruneSafeBlocks(syncActions.blocksToPrune)
l.channelMgr.PruneChannels(syncActions.channelsToPrune)
Expand Down Expand Up @@ -755,13 +744,13 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.

l.espressoSyncAndRefresh(ctx, newSyncStatus)

err = l.espressoStreamer.Update(ctx)
err = l.EspressoStreamer.Update(ctx)

var batch *derive.EspressoBatch

for {

batch = l.espressoStreamer.Next(ctx)
batch = l.EspressoStreamer.Next(ctx)

if batch == nil {
break
Expand Down Expand Up @@ -789,7 +778,7 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync.
if err != nil {
l.Log.Error("failed to add L2 block to channel manager", "err", err)
l.clearState(ctx)
l.espressoStreamer.Reset()
l.EspressoStreamer.Reset()
}

l.Log.Info("Added L2 block to channel manager")
Expand Down
136 changes: 83 additions & 53 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client"
espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client"
"github.com/ethereum-optimism/optimism/espresso"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/chaincfg"
"github.com/ethereum-optimism/optimism/op-node/params"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/cliapp"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -92,7 +94,8 @@ type BatcherService struct {
NotSubmittingOnStart bool

opcrypto.ChainSigner
Espresso *espressoClient.MultipleNodesClient
EspressoStreamer espresso.EspressoStreamer[derive.EspressoBatch]
EspressoClient espressoClient.EspressoClient
EspressoLightClient *espressoLightClient.LightclientCaller
Attestation *nitrite.Result
}
Expand Down Expand Up @@ -136,53 +139,6 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
}
opts = append(optsFromRPC, opts...)

if cfg.Espresso.Enabled {
bs.EspressoPollInterval = cfg.Espresso.PollInterval
client, err := espressoClient.NewMultipleNodesClient(cfg.Espresso.QueryServiceURLs)
if err != nil {
return fmt.Errorf("failed to create Espresso client: %w", err)
}
bs.Espresso = client
espressoLightClient, err := espressoLightClient.NewLightclientCaller(cfg.Espresso.LightClientAddr, bs.L1Client)
if err != nil {
return fmt.Errorf("failed to create Espresso light client")
}
bs.EspressoLightClient = espressoLightClient
bs.UseEspresso = true
if err := bs.initKeyPair(); err != nil {
return fmt.Errorf("failed to create key pair for batcher: %w", err)
}

// try to generate attestationBytes on public key when start batcher
attestationBytes, err := enclave.AttestationWithPublicKey(bs.BatcherPublicKey)
if err != nil {
bs.Log.Info("Not running in enclave, skipping attestation", "info", err)

// Replace ephemeral keys with configured keys, as in devnet they'll be pre-approved for batching
privateKey := cfg.Espresso.TestingBatcherPrivateKey
if privateKey == nil {
return fmt.Errorf("when not running in enclave, testing batcher private key should be set")
}

publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return fmt.Errorf("error casting public key to ECDSA")
}

bs.BatcherPrivateKey = privateKey
bs.BatcherPublicKey = publicKeyECDSA
} else {
// output length of attestation
bs.Log.Info("Successfully got attestation. Attestation length", "length", len(attestationBytes))
result, err := nitrite.Verify(attestationBytes, nitrite.VerifyOptions{})
if err != nil {
return fmt.Errorf("Couldn't verify attestation: %w", err)
}
bs.Attestation = result
}
}

if err := bs.initRollupConfig(ctx); err != nil {
return fmt.Errorf("failed to load rollup config: %w", err)
}
Expand All @@ -203,6 +159,9 @@ func (bs *BatcherService) initFromCLIConfig(ctx context.Context, version string,
if err := bs.initPProf(cfg); err != nil {
return fmt.Errorf("failed to init profiling: %w", err)
}
if err := bs.initEspresso(cfg); err != nil {
return fmt.Errorf("failed to init Espresso: %w", err)
}
bs.initDriver(opts...)
if err := bs.initRPCServer(cfg); err != nil {
return fmt.Errorf("failed to start RPC server: %w", err)
Expand Down Expand Up @@ -449,11 +408,10 @@ func (bs *BatcherService) initDriver(opts ...DriverSetupOption) {
ChannelConfig: bs.ChannelConfig,
AltDA: bs.AltDA,

SequencerAddress: bs.TxManager.From(),
ChainSigner: bs.ChainSigner,
Espresso: bs.Espresso,
EspressoLightClient: bs.EspressoLightClient,
Attestation: bs.Attestation,
EspressoStreamer: bs.EspressoStreamer,
EspressoClient: bs.EspressoClient,
ChainSigner: bs.ChainSigner,
Attestation: bs.Attestation,
}
for _, opt := range opts {
opt(&ds)
Expand Down Expand Up @@ -601,3 +559,75 @@ func (bs *BatcherService) HTTPEndpoint() string {
}
return "http://" + bs.rpcServer.Endpoint()
}

func (bs *BatcherService) initEspresso(cfg *CLIConfig) error {
if !cfg.Espresso.Enabled {
return nil
}

bs.UseEspresso = true
bs.EspressoPollInterval = cfg.Espresso.PollInterval

client, err := espressoClient.NewMultipleNodesClient(cfg.Espresso.QueryServiceURLs)
if err != nil {
return fmt.Errorf("failed to create Espresso client: %w", err)
}
bs.EspressoClient = client

espressoLightClient, err := espressoLightClient.NewLightclientCaller(cfg.Espresso.LightClientAddr, bs.L1Client)
if err != nil {
return fmt.Errorf("failed to create Espresso light client")
}
bs.EspressoLightClient = espressoLightClient

if err := bs.initKeyPair(); err != nil {
return fmt.Errorf("failed to create key pair for batcher: %w", err)
}

unbufferedStreamer := espresso.NewEspressoStreamer(
bs.RollupConfig.L2ChainID.Uint64(),
NewAdaptL1BlockRefClient(bs.L1Client),
client,
bs.EspressoLightClient,
bs.Log,
func(data []byte) (*derive.EspressoBatch, error) {
return derive.UnmarshalEspressoTransaction(data, bs.TxManager.From())
},
2*time.Second,
)
unbufferedStreamer.UseFetchApi = cfg.Espresso.UseFetchAPI

// We wrap the streamer in a BufferedStreamer to reduce impact of streamer resets
bs.EspressoStreamer = espresso.NewBufferedEspressoStreamer(unbufferedStreamer)
Copy link
Copy Markdown
Collaborator

@philippecamacho philippecamacho Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

espresso.NewBufferedEspressoStreamer(unbufferedStreamer)

Could you add a comment (or do some renaming) to explain why we pass an unbuffered Streamer to a NewBufferedEspressoStreamer constructor?


// try to generate attestationBytes on public key when start batcher
attestationBytes, err := enclave.AttestationWithPublicKey(bs.BatcherPublicKey)
if err != nil {
bs.Log.Info("Not running in enclave, skipping attestation", "info", err)

// Replace ephemeral keys with configured keys, as in devnet they'll be pre-approved for batching
privateKey := cfg.Espresso.TestingBatcherPrivateKey
if privateKey == nil {
return fmt.Errorf("when not running in enclave, testing batcher private key should be set")
}

publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return fmt.Errorf("error casting public key to ECDSA")
}

bs.BatcherPrivateKey = privateKey
bs.BatcherPublicKey = publicKeyECDSA
} else {
// output length of attestation
bs.Log.Info("Successfully got attestation. Attestation length", "length", len(attestationBytes))
result, err := nitrite.Verify(attestationBytes, nitrite.VerifyOptions{})
if err != nil {
return fmt.Errorf("Couldn't verify attestation: %w", err)
}
bs.Attestation = result
}

return nil
}
1 change: 1 addition & 0 deletions op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func initEspressoStreamer(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetche
},
cfg.CaffNodeConfig.PollInterval,
)
streamer.UseFetchApi = cfg.CaffNodeConfig.UseFetchAPI

log.Debug("Espresso Streamer namespace:", streamer.Namespace)

Expand Down
Loading