-
Notifications
You must be signed in to change notification settings - Fork 238
fix(execution/evm): verify payload status #2863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6a7e857
1a005f8
fd5d8a2
2ba0a22
4e719fc
69b54d1
c1dff1d
50304a8
7b59bc7
eb3d0f4
58d7c2b
a00d4ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,14 +22,94 @@ import ( | |
| "github.com/evstack/ev-node/core/execution" | ||
| ) | ||
|
|
||
| const ( | ||
| // MaxPayloadStatusRetries is the maximum number of retries for SYNCING status. | ||
| // According to the Engine API specification, SYNCING indicates temporary unavailability | ||
| // and should be retried with exponential backoff. | ||
| MaxPayloadStatusRetries = 3 | ||
| // InitialRetryBackoff is the initial backoff duration for retries. | ||
| // The backoff doubles on each retry attempt (exponential backoff). | ||
| InitialRetryBackoff = 1 * time.Second | ||
| ) | ||
|
|
||
| var ( | ||
| // ErrInvalidPayloadStatus indicates that EVM returned status != VALID | ||
| // ErrInvalidPayloadStatus indicates that the execution engine returned a permanent | ||
| // failure status (INVALID or unknown status). This error should not be retried. | ||
| ErrInvalidPayloadStatus = errors.New("invalid payload status") | ||
| // ErrPayloadSyncing indicates that the execution engine is temporarily syncing. | ||
| // According to the Engine API specification, this is a transient condition that | ||
| // should be handled with retry logic rather than immediate failure. | ||
| ErrPayloadSyncing = errors.New("payload syncing") | ||
| ) | ||
|
|
||
| // Ensure EngineAPIExecutionClient implements the execution.Execute interface | ||
| var _ execution.Executor = (*EngineClient)(nil) | ||
|
|
||
| // validatePayloadStatus checks the payload status and returns appropriate errors. | ||
| // It implements the Engine API specification's status handling: | ||
| // - VALID: Operation succeeded, return nil | ||
| // - SYNCING/ACCEPTED: Temporary unavailability, return ErrPayloadSyncing for retry | ||
| // - INVALID: Permanent failure, return ErrInvalidPayloadStatus (no retry) | ||
| // - Unknown: Treat as permanent failure (no retry) | ||
| func validatePayloadStatus(status engine.PayloadStatusV1) error { | ||
| switch status.Status { | ||
| case engine.VALID: | ||
| return nil | ||
| case engine.SYNCING, engine.ACCEPTED: | ||
| // SYNCING and ACCEPTED indicate temporary unavailability - should retry | ||
| return ErrPayloadSyncing | ||
| case engine.INVALID: | ||
| // INVALID is a permanent failure - should not retry | ||
| return ErrInvalidPayloadStatus | ||
| default: | ||
| // Unknown status - treat as invalid | ||
| return ErrInvalidPayloadStatus | ||
| } | ||
| } | ||
|
|
||
| // retryWithBackoff executes a function with exponential backoff retry logic. | ||
| // It implements the Engine API specification's recommendation to retry SYNCING | ||
| // status with exponential backoff. The function: | ||
| // - Retries only on ErrPayloadSyncing (transient failures) | ||
| // - Fails immediately on ErrInvalidPayloadStatus (permanent failures) | ||
| // - Respects context cancellation for graceful shutdown | ||
| // - Uses exponential backoff that doubles on each attempt | ||
| func retryWithBackoffOnPayloadStatus(ctx context.Context, fn func() error, maxRetries int, initialBackoff time.Duration, operation string) error { | ||
| backoff := initialBackoff | ||
|
|
||
| for attempt := 1; attempt <= maxRetries; attempt++ { | ||
| err := fn() | ||
| if err == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // Don't retry on invalid status | ||
| if errors.Is(err, ErrInvalidPayloadStatus) { | ||
| return err | ||
| } | ||
|
|
||
| // Only retry on syncing status | ||
| if !errors.Is(err, ErrPayloadSyncing) { | ||
| return err | ||
| } | ||
|
|
||
| // Check if we've exhausted retries | ||
| if attempt >= maxRetries { | ||
| return fmt.Errorf("max retries (%d) exceeded for %s: %w", maxRetries, operation, err) | ||
| } | ||
|
|
||
| // Wait with exponential backoff | ||
| select { | ||
| case <-ctx.Done(): | ||
| return fmt.Errorf("context cancelled during retry for %s: %w", operation, ctx.Err()) | ||
| case <-time.After(backoff): | ||
| backoff *= 2 | ||
| } | ||
| } | ||
|
|
||
| return fmt.Errorf("max retries (%d) exceeded for %s", maxRetries, operation) | ||
julienrbrt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // EngineClient represents a client that interacts with an Ethereum execution engine | ||
| // through the Engine API. It manages connections to both the engine and standard Ethereum | ||
| // APIs, and maintains state related to block processing. | ||
|
|
@@ -105,28 +185,35 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini | |
| return nil, 0, fmt.Errorf("initialHeight must be 1, got %d", initialHeight) | ||
| } | ||
|
|
||
| // Acknowledge the genesis block | ||
| var forkchoiceResult engine.ForkChoiceResponse | ||
| err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", | ||
| engine.ForkchoiceStateV1{ | ||
| HeadBlockHash: c.genesisHash, | ||
| SafeBlockHash: c.genesisHash, | ||
| FinalizedBlockHash: c.genesisHash, | ||
| }, | ||
| nil, | ||
| ) | ||
| if err != nil { | ||
| return nil, 0, fmt.Errorf("engine_forkchoiceUpdatedV3 failed: %w", err) | ||
| } | ||
| // Acknowledge the genesis block with retry logic for SYNCING status | ||
| err := retryWithBackoffOnPayloadStatus(ctx, func() error { | ||
| var forkchoiceResult engine.ForkChoiceResponse | ||
| err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", | ||
| engine.ForkchoiceStateV1{ | ||
| HeadBlockHash: c.genesisHash, | ||
| SafeBlockHash: c.genesisHash, | ||
| FinalizedBlockHash: c.genesisHash, | ||
| }, | ||
| nil, | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("engine_forkchoiceUpdatedV3 failed: %w", err) | ||
| } | ||
|
|
||
| // Validate payload status | ||
| if forkchoiceResult.PayloadStatus.Status != engine.VALID { | ||
| c.logger.Warn(). | ||
| Str("status", string(forkchoiceResult.PayloadStatus.Status)). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Msg("InitChain: engine_forkchoiceUpdatedV3 returned non-VALID status") | ||
| return nil, 0, fmt.Errorf("%w: status=%s", ErrInvalidPayloadStatus, forkchoiceResult.PayloadStatus.Status) | ||
| // Validate payload status | ||
| if err := validatePayloadStatus(forkchoiceResult.PayloadStatus); err != nil { | ||
| c.logger.Warn(). | ||
| Str("status", forkchoiceResult.PayloadStatus.Status). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Msg("InitChain: engine_forkchoiceUpdatedV3 returned non-VALID status") | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
| }, MaxPayloadStatusRetries, InitialRetryBackoff, "InitChain") | ||
| if err != nil { | ||
| return nil, 0, err | ||
| } | ||
|
|
||
| _, stateRoot, gasLimit, _, err := c.getBlockInfo(ctx, 0) | ||
|
|
@@ -184,7 +271,7 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight | |
|
|
||
| // update forkchoice to get the next payload id | ||
| // Create evolve-compatible payloadtimestamp.Unix() | ||
| evPayloadAttrs := map[string]interface{}{ | ||
| evPayloadAttrs := map[string]any{ | ||
| // Standard Ethereum payload attributes (flattened) - using camelCase as expected by JSON | ||
| "timestamp": timestamp.Unix(), | ||
| "prevRandao": c.derivePrevRandao(blockHeight), | ||
|
|
@@ -202,64 +289,82 @@ func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight | |
| Int("tx_count", len(txs)). | ||
| Msg("engine_forkchoiceUpdatedV3") | ||
|
|
||
| var forkchoiceResult engine.ForkChoiceResponse | ||
| err = c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, evPayloadAttrs) | ||
| if err != nil { | ||
| return nil, 0, fmt.Errorf("forkchoice update failed: %w", err) | ||
| } | ||
| // Call forkchoice update with retry logic for SYNCING status | ||
| var payloadID *engine.PayloadID | ||
| err = retryWithBackoffOnPayloadStatus(ctx, func() error { | ||
| var forkchoiceResult engine.ForkChoiceResponse | ||
| err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, evPayloadAttrs) | ||
| if err != nil { | ||
| return fmt.Errorf("forkchoice update failed: %w", err) | ||
| } | ||
|
|
||
| // Validate payload status | ||
| if forkchoiceResult.PayloadStatus.Status != engine.VALID { | ||
| c.logger.Warn(). | ||
| Str("status", string(forkchoiceResult.PayloadStatus.Status)). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Uint64("blockHeight", blockHeight). | ||
| Msg("ExecuteTxs: engine_forkchoiceUpdatedV3 returned non-VALID status") | ||
| return nil, 0, fmt.Errorf("%w: status=%s", ErrInvalidPayloadStatus, forkchoiceResult.PayloadStatus.Status) | ||
| } | ||
| // Validate payload status | ||
| if err := validatePayloadStatus(forkchoiceResult.PayloadStatus); err != nil { | ||
| c.logger.Warn(). | ||
| Str("status", forkchoiceResult.PayloadStatus.Status). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Uint64("blockHeight", blockHeight). | ||
| Msg("ExecuteTxs: engine_forkchoiceUpdatedV3 returned non-VALID status") | ||
| return err | ||
| } | ||
|
|
||
| if forkchoiceResult.PayloadID == nil { | ||
| c.logger.Error(). | ||
| Str("status", forkchoiceResult.PayloadStatus.Status). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Interface("forkchoiceState", args). | ||
| Interface("payloadAttributes", evPayloadAttrs). | ||
| Uint64("blockHeight", blockHeight). | ||
| Msg("returned nil PayloadID") | ||
|
|
||
| return fmt.Errorf("returned nil PayloadID - (status: %s, latestValidHash: %s)", | ||
| forkchoiceResult.PayloadStatus.Status, | ||
| forkchoiceResult.PayloadStatus.LatestValidHash.Hex()) | ||
| } | ||
|
|
||
| if forkchoiceResult.PayloadID == nil { | ||
| c.logger.Error(). | ||
| Str("status", string(forkchoiceResult.PayloadStatus.Status)). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Interface("forkchoiceState", args). | ||
| Interface("payloadAttributes", evPayloadAttrs). | ||
| Uint64("blockHeight", blockHeight). | ||
| Msg("returned nil PayloadID") | ||
|
|
||
| return nil, 0, fmt.Errorf("returned nil PayloadID - (status: %s, latestValidHash: %s)", | ||
| forkchoiceResult.PayloadStatus.Status, | ||
| forkchoiceResult.PayloadStatus.LatestValidHash.Hex()) | ||
| payloadID = forkchoiceResult.PayloadID | ||
| return nil | ||
| }, MaxPayloadStatusRetries, InitialRetryBackoff, "ExecuteTxs forkchoice") | ||
| if err != nil { | ||
| return nil, 0, err | ||
| } | ||
|
|
||
| // get payload | ||
| var payloadResult engine.ExecutionPayloadEnvelope | ||
| err = c.engineClient.CallContext(ctx, &payloadResult, "engine_getPayloadV4", *forkchoiceResult.PayloadID) | ||
| err = c.engineClient.CallContext(ctx, &payloadResult, "engine_getPayloadV4", *payloadID) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should there be a retry for this call, too?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have checked and ExecutionPayloadEnvelope doesn't return a payload status, so it cannot be retried with the same logic. I am updated the API naming to clarify the retry function usage (a00d4ea) |
||
| if err != nil { | ||
| return nil, 0, fmt.Errorf("get payload failed: %w", err) | ||
| } | ||
|
|
||
| // submit payload | ||
| // Submit payload with retry logic for SYNCING status | ||
| var newPayloadResult engine.PayloadStatusV1 | ||
| err = c.engineClient.CallContext(ctx, &newPayloadResult, "engine_newPayloadV4", | ||
| payloadResult.ExecutionPayload, | ||
| []string{}, // No blob hashes | ||
| common.Hash{}.Hex(), // Use zero hash for parentBeaconBlockRoot (same as in payload attributes) | ||
| [][]byte{}, // No execution requests | ||
| ) | ||
| if err != nil { | ||
| return nil, 0, fmt.Errorf("new payload submission failed: %w", err) | ||
| } | ||
| err = retryWithBackoffOnPayloadStatus(ctx, func() error { | ||
| err := c.engineClient.CallContext(ctx, &newPayloadResult, "engine_newPayloadV4", | ||
| payloadResult.ExecutionPayload, | ||
| []string{}, // No blob hashes | ||
| common.Hash{}.Hex(), // Use zero hash for parentBeaconBlockRoot (same as in payload attributes) | ||
| [][]byte{}, // No execution requests | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("new payload submission failed: %w", err) | ||
| } | ||
|
|
||
| if newPayloadResult.Status != engine.VALID { | ||
| c.logger.Warn(). | ||
| Str("status", string(newPayloadResult.Status)). | ||
| Str("latestValidHash", newPayloadResult.LatestValidHash.Hex()). | ||
| Interface("validationError", newPayloadResult.ValidationError). | ||
| Msg("engine_newPayloadV4 returned non-VALID status") | ||
| return nil, 0, ErrInvalidPayloadStatus | ||
| // Validate payload status | ||
| if err := validatePayloadStatus(newPayloadResult); err != nil { | ||
| c.logger.Warn(). | ||
| Str("status", newPayloadResult.Status). | ||
| Str("latestValidHash", newPayloadResult.LatestValidHash.Hex()). | ||
| Interface("validationError", newPayloadResult.ValidationError). | ||
| Uint64("blockHeight", blockHeight). | ||
| Msg("engine_newPayloadV4 returned non-VALID status") | ||
| return err | ||
| } | ||
| return nil | ||
| }, MaxPayloadStatusRetries, InitialRetryBackoff, "ExecuteTxs newPayload") | ||
| if err != nil { | ||
| return nil, 0, err | ||
| } | ||
|
|
||
| // forkchoice update | ||
|
|
@@ -290,19 +395,27 @@ func (c *EngineClient) setFinal(ctx context.Context, blockHash common.Hash, isFi | |
| } | ||
| c.mu.Unlock() | ||
|
|
||
| var forkchoiceResult engine.ForkChoiceResponse | ||
| err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, nil) | ||
| if err != nil { | ||
| return fmt.Errorf("forkchoice update failed with error: %w", err) | ||
| } | ||
| // Call forkchoice update with retry logic for SYNCING status | ||
| err := retryWithBackoffOnPayloadStatus(ctx, func() error { | ||
| var forkchoiceResult engine.ForkChoiceResponse | ||
| err := c.engineClient.CallContext(ctx, &forkchoiceResult, "engine_forkchoiceUpdatedV3", args, nil) | ||
| if err != nil { | ||
| return fmt.Errorf("forkchoice update failed: %w", err) | ||
| } | ||
|
|
||
| if forkchoiceResult.PayloadStatus.Status != engine.VALID { | ||
| c.logger.Warn(). | ||
| Str("status", string(forkchoiceResult.PayloadStatus.Status)). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Msg("forkchoiceUpdatedV3 returned non-VALID status") | ||
| return ErrInvalidPayloadStatus | ||
| // Validate payload status | ||
| if err := validatePayloadStatus(forkchoiceResult.PayloadStatus); err != nil { | ||
| c.logger.Warn(). | ||
| Str("status", forkchoiceResult.PayloadStatus.Status). | ||
| Str("latestValidHash", forkchoiceResult.PayloadStatus.LatestValidHash.Hex()). | ||
| Interface("validationError", forkchoiceResult.PayloadStatus.ValidationError). | ||
| Msg("forkchoiceUpdatedV3 returned non-VALID status") | ||
| return err | ||
| } | ||
| return nil | ||
| }, MaxPayloadStatusRetries, InitialRetryBackoff, "setFinal") | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.