Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,6 @@ config/jwt.txt
*.pem

packages/contracts-bedrock/lib/automate/

# AI tools
.claude
74 changes: 62 additions & 12 deletions espresso/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,31 @@ func espressoEnvs(envprefix, v string) []string {
return []string{envprefix + "_ESPRESSO_" + v}
}

// Default values for batch submission receipt verification tuning.
// Defined here so that both the CLI flag defaults and the batcher logic
// can reference a single source of truth.
const (
DefaultVerifyReceiptMaxBlocks uint64 = 5
DefaultVerifyReceiptSafetyTimeout time.Duration = 5 * time.Minute
DefaultVerifyReceiptRetryDelay time.Duration = 100 * time.Millisecond
)

var (
EnabledFlagName = espressoFlags("enabled")
PollIntervalFlagName = espressoFlags("poll-interval")
QueryServiceUrlsFlagName = espressoFlags("urls")
LightClientAddrFlagName = espressoFlags("light-client-addr")
L1UrlFlagName = espressoFlags("l1-url")
TestingBatcherPrivateKeyFlagName = espressoFlags("testing-batcher-private-key")
CaffeinationHeightEspresso = espressoFlags("origin-height-espresso")
CaffeinationHeightL2 = espressoFlags("origin-height-l2")
NamespaceFlagName = espressoFlags("namespace")
RollupL1UrlFlagName = espressoFlags("rollup-l1-url")
AttestationServiceFlagName = espressoFlags("espresso-attestation-service")
BatchAuthenticatorAddrFlagName = espressoFlags("batch-authenticator-addr")
EnabledFlagName = espressoFlags("enabled")
PollIntervalFlagName = espressoFlags("poll-interval")
QueryServiceUrlsFlagName = espressoFlags("urls")
LightClientAddrFlagName = espressoFlags("light-client-addr")
L1UrlFlagName = espressoFlags("l1-url")
TestingBatcherPrivateKeyFlagName = espressoFlags("testing-batcher-private-key")
CaffeinationHeightEspresso = espressoFlags("origin-height-espresso")
CaffeinationHeightL2 = espressoFlags("origin-height-l2")
NamespaceFlagName = espressoFlags("namespace")
RollupL1UrlFlagName = espressoFlags("rollup-l1-url")
AttestationServiceFlagName = espressoFlags("espresso-attestation-service")
BatchAuthenticatorAddrFlagName = espressoFlags("batch-authenticator-addr")
VerifyReceiptMaxBlocksFlagName = espressoFlags("verify-receipt-max-blocks")
VerifyReceiptSafetyTimeoutFlagName = espressoFlags("verify-receipt-safety-timeout")
VerifyReceiptRetryDelayFlagName = espressoFlags("verify-receipt-retry-delay")
)

func CLIFlags(envPrefix string, category string) []cli.Flag {
Expand Down Expand Up @@ -119,6 +131,27 @@ func CLIFlags(envPrefix string, category string) []cli.Flag {
EnvVars: espressoEnvs(envPrefix, "BATCH_AUTHENTICATOR_ADDR"),
Category: category,
},
&cli.Uint64Flag{
Name: VerifyReceiptMaxBlocksFlagName,
Usage: "Number of HotShot blocks to wait for a submitted transaction to become queryable before re-submitting",
Value: DefaultVerifyReceiptMaxBlocks,
EnvVars: espressoEnvs(envPrefix, "VERIFY_RECEIPT_MAX_BLOCKS"),
Category: category,
},
&cli.DurationFlag{
Name: VerifyReceiptSafetyTimeoutFlagName,
Usage: "Wall-clock backstop for receipt verification; re-submits the transaction if this duration is exceeded",
Value: DefaultVerifyReceiptSafetyTimeout,
EnvVars: espressoEnvs(envPrefix, "VERIFY_RECEIPT_SAFETY_TIMEOUT"),
Category: category,
},
&cli.DurationFlag{
Name: VerifyReceiptRetryDelayFlagName,
Usage: "Delay between receipt verification retries",
Value: DefaultVerifyReceiptRetryDelay,
EnvVars: espressoEnvs(envPrefix, "VERIFY_RECEIPT_RETRY_DELAY"),
Category: category,
},
}
}

Expand All @@ -136,6 +169,11 @@ type CLIConfig struct {
CaffeinationHeightL2 uint64
EspressoAttestationService string

// Batch submission receipt verification tuning
VerifyReceiptMaxBlocks uint64
Comment thread
philippecamacho marked this conversation as resolved.
VerifyReceiptSafetyTimeout time.Duration
VerifyReceiptRetryDelay time.Duration

// Non directly configurable option
allowEmptyAttestationService bool `json:"-"`
}
Expand Down Expand Up @@ -169,6 +207,15 @@ func (c CLIConfig) Check() error {
if !c.allowEmptyAttestationService && c.EspressoAttestationService == "" {
return fmt.Errorf("attestation service URL is required when Espresso is enabled")
}
if c.VerifyReceiptMaxBlocks == 0 {
return fmt.Errorf("verify-receipt-max-blocks must be > 0")
}
if c.VerifyReceiptSafetyTimeout <= 0 {
return fmt.Errorf("verify-receipt-safety-timeout must be > 0")
}
if c.VerifyReceiptRetryDelay <= 0 {
return fmt.Errorf("verify-receipt-retry-delay must be > 0")
}
}
return nil
}
Expand All @@ -183,6 +230,9 @@ func ReadCLIConfig(c *cli.Context) CLIConfig {
CaffeinationHeightEspresso: c.Uint64(CaffeinationHeightEspresso),
CaffeinationHeightL2: c.Uint64(CaffeinationHeightL2),
EspressoAttestationService: c.String(AttestationServiceFlagName),
VerifyReceiptMaxBlocks: c.Uint64(VerifyReceiptMaxBlocksFlagName),
VerifyReceiptSafetyTimeout: c.Duration(VerifyReceiptSafetyTimeoutFlagName),
VerifyReceiptRetryDelay: c.Duration(VerifyReceiptRetryDelayFlagName),
}

config.QueryServiceURLs = c.StringSlice(QueryServiceUrlsFlagName)
Expand Down
10 changes: 5 additions & 5 deletions espresso/environment/optitmism_espresso_test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ func launchEspressoDevNodeStartOption(ct *E2eDevnetLauncherContext) e2esys.Start

l1EthRpcURLPtr, err := url.Parse(c.L1EthRpc)
if err != nil {
ct.Error = FailedToDetermineL1RPCURL{Cause: err}
ct.T.Fatalf("failed to parse L1 RPC URL %q: %v", c.L1EthRpc, err)
return
}

Expand All @@ -834,29 +834,29 @@ func launchEspressoDevNodeStartOption(ct *E2eDevnetLauncherContext) e2esys.Start
// Let's spin up the espresso-dev-node
l1EthRpcURL, err := translateContainerToNodeURL(*l1EthRpcURLPtr, network)
if err != nil {
ct.Error = err
ct.T.Fatalf("failed to translate L1 RPC URL for Docker: %v", err)
return
}

dockerConfig, portRemapping, err := determineEspressoDevNodeDockerContainerConfig(l1EthRpcURL, network)
if err != nil {
ct.Error = err
ct.T.Fatalf("failed to build espresso dev node Docker config: %v", err)
return
}

containerCli := new(DockerCli)

espressoDevNodeContainerInfo, err := containerCli.LaunchContainer(ct.Ctx, dockerConfig)
if err != nil {
ct.Error = FailedToLaunchDockerContainer{Cause: err}
ct.T.Fatalf("failed to launch espresso dev node container: %v", err)
return
}

ensureHardCodedPortsAreMappedFromTheirOriginalValues(&espressoDevNodeContainerInfo, portRemapping, network)

// Wait for Espresso to be ready
if err := waitForEspressoToFinishSpinningUp(ct, espressoDevNodeContainerInfo); err != nil {
ct.Error = err
ct.T.Fatalf("espresso dev node failed to become ready: %v", err)
return
}

Expand Down
3 changes: 3 additions & 0 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
WithContext(l.shutdownCtx),
WithWaitGroup(l.wg),
WithEspressoClient(l.Espresso),
WithVerifyReceiptMaxBlocks(l.Config.VerifyReceiptMaxBlocks),
WithVerifyReceiptSafetyTimeout(l.Config.VerifyReceiptSafetyTimeout),
WithVerifyReceiptRetryDelay(l.Config.VerifyReceiptRetryDelay),
)
l.espressoSubmitter.SpawnWorkers(4, 4)
l.espressoSubmitter.Start()
Expand Down
113 changes: 70 additions & 43 deletions op-batcher/batcher/espresso.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/signer/core/apitypes"

"github.com/ethereum-optimism/optimism/espresso"
"github.com/ethereum-optimism/optimism/espresso/bindings"
"github.com/ethereum-optimism/optimism/espresso/logmodule"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
Expand Down Expand Up @@ -109,16 +110,19 @@ type espressoVerifyReceiptJobAttempt struct {
// the worker queue processing details for submitting transactions to Espresso
// without spawning arbitrarily many goroutines.
type espressoTransactionSubmitter struct {
ctx context.Context
wg *sync.WaitGroup
submitJobQueue chan espressoSubmitTransactionJob
submitRespQueue chan espressoSubmitTransactionJobResponse
submitWorkerQueue chan chan espressoTransactionJobAttempt
verifyReceiptJobQueue chan espressoVerifyReceiptJob
verifyReceiptRespQueue chan espressoVerifyReceiptJobResponse
verifyReceiptWorkerQueue chan chan espressoVerifyReceiptJobAttempt
espresso espressoClient.EspressoClient
latestBlockHeight atomic.Uint64 // shared HotShot block height, updated by trackBlockHeight
ctx context.Context
wg *sync.WaitGroup
submitJobQueue chan espressoSubmitTransactionJob
submitRespQueue chan espressoSubmitTransactionJobResponse
submitWorkerQueue chan chan espressoTransactionJobAttempt
verifyReceiptJobQueue chan espressoVerifyReceiptJob
verifyReceiptRespQueue chan espressoVerifyReceiptJobResponse
verifyReceiptWorkerQueue chan chan espressoVerifyReceiptJobAttempt
espresso espressoClient.EspressoClient
latestBlockHeight atomic.Uint64 // shared HotShot block height, updated by trackBlockHeight
verifyReceiptMaxBlocks uint64
verifyReceiptSafetyTimeout time.Duration
verifyReceiptRetryDelay time.Duration
}

// EspressoTransactionSubmitterConfig is a configuration struct for the
Expand All @@ -132,6 +136,9 @@ type EspressoTransactionSubmitterConfig struct {
SubmitResponseQueueCapacity int
VerifyReceiptJobQueueCapacity int
VerifyReceiptResponseQueueCapacity int
VerifyReceiptMaxBlocks uint64
VerifyReceiptSafetyTimeout time.Duration
VerifyReceiptRetryDelay time.Duration
}

// EspressoTransactionSubmitterOption is a function that can be used to
Expand Down Expand Up @@ -162,6 +169,30 @@ func WithWaitGroup(wg *sync.WaitGroup) EspressoTransactionSubmitterOption {
}
}

// WithVerifyReceiptMaxBlocks sets the number of HotShot blocks to wait for a
// submitted transaction to become queryable before re-submitting.
func WithVerifyReceiptMaxBlocks(n uint64) EspressoTransactionSubmitterOption {
return func(config *EspressoTransactionSubmitterConfig) {
config.VerifyReceiptMaxBlocks = n
}
}

// WithVerifyReceiptSafetyTimeout sets the wall-clock backstop for receipt
// verification. If the block height tracker is stale or broken, re-submission
// is triggered after this duration.
func WithVerifyReceiptSafetyTimeout(d time.Duration) EspressoTransactionSubmitterOption {
return func(config *EspressoTransactionSubmitterConfig) {
config.VerifyReceiptSafetyTimeout = d
}
}

// WithVerifyReceiptRetryDelay sets the delay between receipt verification retries.
func WithVerifyReceiptRetryDelay(d time.Duration) EspressoTransactionSubmitterOption {
return func(config *EspressoTransactionSubmitterConfig) {
config.VerifyReceiptRetryDelay = d
}
}

// NewEspressoTransactionSubmitter creates a new EspressoTransactionSubmitter
// with the given context and espresso client. It will create a new transaction
// submitter with some default options, and apply those options to the
Expand All @@ -180,6 +211,9 @@ func NewEspressoTransactionSubmitter(options ...EspressoTransactionSubmitterOpti
SubmitResponseQueueCapacity: 10,
VerifyReceiptJobQueueCapacity: 1024,
VerifyReceiptResponseQueueCapacity: 10,
VerifyReceiptMaxBlocks: espresso.DefaultVerifyReceiptMaxBlocks,
VerifyReceiptSafetyTimeout: espresso.DefaultVerifyReceiptSafetyTimeout,
VerifyReceiptRetryDelay: espresso.DefaultVerifyReceiptRetryDelay,
}

for _, option := range options {
Expand All @@ -191,15 +225,18 @@ func NewEspressoTransactionSubmitter(options ...EspressoTransactionSubmitterOpti
}

return &espressoTransactionSubmitter{
ctx: config.Ctx,
wg: config.Wg,
submitJobQueue: make(chan espressoSubmitTransactionJob, config.SubmitJobQueueCapacity),
submitRespQueue: make(chan espressoSubmitTransactionJobResponse, config.SubmitResponseQueueCapacity),
submitWorkerQueue: make(chan chan espressoTransactionJobAttempt),
verifyReceiptJobQueue: make(chan espressoVerifyReceiptJob, config.VerifyReceiptJobQueueCapacity),
verifyReceiptRespQueue: make(chan espressoVerifyReceiptJobResponse, config.VerifyReceiptResponseQueueCapacity),
verifyReceiptWorkerQueue: make(chan chan espressoVerifyReceiptJobAttempt),
espresso: config.EspressoClient,
ctx: config.Ctx,
wg: config.Wg,
submitJobQueue: make(chan espressoSubmitTransactionJob, config.SubmitJobQueueCapacity),
submitRespQueue: make(chan espressoSubmitTransactionJobResponse, config.SubmitResponseQueueCapacity),
submitWorkerQueue: make(chan chan espressoTransactionJobAttempt),
verifyReceiptJobQueue: make(chan espressoVerifyReceiptJob, config.VerifyReceiptJobQueueCapacity),
verifyReceiptRespQueue: make(chan espressoVerifyReceiptJobResponse, config.VerifyReceiptResponseQueueCapacity),
verifyReceiptWorkerQueue: make(chan chan espressoVerifyReceiptJobAttempt),
espresso: config.EspressoClient,
verifyReceiptMaxBlocks: config.VerifyReceiptMaxBlocks,
verifyReceiptSafetyTimeout: config.VerifyReceiptSafetyTimeout,
verifyReceiptRetryDelay: config.VerifyReceiptRetryDelay,
}
}

Expand Down Expand Up @@ -315,22 +352,11 @@ func (s *espressoTransactionSubmitter) handleTransactionSubmitJobResponse() {
}
}

// VERIFY_RECEIPT_MAX_BLOCKS is the number of HotShot blocks we will wait
// for a submitted transaction to become queryable before re-submitting.
// Using block count instead of wall-clock time makes us resilient to
// variable block times across different Espresso networks.
const VERIFY_RECEIPT_MAX_BLOCKS uint64 = 5

// VERIFY_RECEIPT_SAFETY_TIMEOUT is a wall-clock backstop for receipt
// verification. If the block height tracker is stale or broken, we fall
// back to this generous timeout before re-submitting.
const VERIFY_RECEIPT_SAFETY_TIMEOUT = 5 * time.Minute

// VERIFY_RECEIPT_RETRY_DELAY is the amount of time we will wait before
// retrying a job that failed to verify the receipt.
const VERIFY_RECEIPT_RETRY_DELAY = 100 * time.Millisecond
// Default values for receipt verification tuning are defined as exported
// constants in the espresso package (espresso.DefaultVerifyReceipt*) so that
// the CLI flag defaults and this batcher logic share a single source of truth.

// Evaluate the verification job.
// evaluateVerification evaluates the verification job response.
//
// # Returns
//
Expand All @@ -343,7 +369,7 @@ const VERIFY_RECEIPT_RETRY_DELAY = 100 * time.Millisecond
// * If the wall-clock safety timeout is exceeded: RetrySubmission.
//
// * Otherwise: RetryVerification.
func evaluateVerification(jobResp espressoVerifyReceiptJobResponse) JobEvaluation {
func (s *espressoTransactionSubmitter) evaluateVerification(jobResp espressoVerifyReceiptJobResponse) JobEvaluation {
err := jobResp.err

// If there's no error, continue handling the verification.
Expand All @@ -363,20 +389,20 @@ func evaluateVerification(jobResp espressoVerifyReceiptJobResponse) JobEvaluatio
// Block-count-based timeout: re-submit if enough HotShot blocks have
// passed since verification started. The startHeight guard handles the
// edge case where the height tracker hasn't fetched its first value yet.
if jobResp.job.startHeight > 0 && jobResp.currentHeight >= jobResp.job.startHeight+VERIFY_RECEIPT_MAX_BLOCKS {
if jobResp.job.startHeight > 0 && jobResp.currentHeight >= jobResp.job.startHeight+s.verifyReceiptMaxBlocks {
log.Info("Verification timed out by block count, re-submitting",
"startHeight", jobResp.job.startHeight,
"currentHeight", jobResp.currentHeight,
"maxBlocks", VERIFY_RECEIPT_MAX_BLOCKS)
"maxBlocks", s.verifyReceiptMaxBlocks)
return RetrySubmission
}

// Wall-clock safety backstop in case the block height tracker is stale
// or broken (e.g., query service returning old data).
if elapsed := time.Since(jobResp.job.startTime); elapsed > VERIFY_RECEIPT_SAFETY_TIMEOUT {
if elapsed := time.Since(jobResp.job.startTime); elapsed > s.verifyReceiptSafetyTimeout {
log.Warn("Verification timed out by safety timeout, re-submitting",
"elapsed", elapsed,
"safetyTimeout", VERIFY_RECEIPT_SAFETY_TIMEOUT)
"safetyTimeout", s.verifyReceiptSafetyTimeout)
return RetrySubmission
}

Expand Down Expand Up @@ -412,7 +438,7 @@ func (s *espressoTransactionSubmitter) handleVerifyReceiptJobResponse() {
}
}

switch evaluation := evaluateVerification(jobResp); evaluation {
switch evaluation := s.evaluateVerification(jobResp); evaluation {
case Skip:
continue
case RetrySubmission:
Expand Down Expand Up @@ -601,6 +627,7 @@ func espressoVerifyTransactionWorker(
cli espressoClient.EspressoClient,
workerQueue chan<- chan espressoVerifyReceiptJobAttempt,
latestHeight *atomic.Uint64,
retryDelay time.Duration,
) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down Expand Up @@ -640,7 +667,7 @@ func espressoVerifyTransactionWorker(
// We have already attempted this job, so we will wait a bit
// NOTE: this prevents this worker from being able to process
// other jobs while we wait for this delay.
time.Sleep(VERIFY_RECEIPT_RETRY_DELAY)
time.Sleep(retryDelay)
}

_, err := cli.FetchTransactionByHash(ctx, jobAttempt.job.hash)
Expand Down Expand Up @@ -673,7 +700,7 @@ func (s *espressoTransactionSubmitter) SpawnWorkers(numSubmitTransactionWorkers,

for i := 0; i < numVerifyReceiptWorkers; i++ {
s.wg.Add(1)
go espressoVerifyTransactionWorker(workersCtx, s.wg, s.espresso, s.verifyReceiptWorkerQueue, &s.latestBlockHeight)
go espressoVerifyTransactionWorker(workersCtx, s.wg, s.espresso, s.verifyReceiptWorkerQueue, &s.latestBlockHeight, s.verifyReceiptRetryDelay)
}
}

Expand All @@ -691,7 +718,7 @@ func (s *espressoTransactionSubmitter) trackBlockHeight() {

// Wait for the next interval or until context is done.
select {
case <-time.After(VERIFY_RECEIPT_RETRY_DELAY):
case <-time.After(s.verifyReceiptRetryDelay):
case <-s.ctx.Done():
return
}
Expand Down
Loading
Loading