-
Notifications
You must be signed in to change notification settings - Fork 72
ZK to MPT #827
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ZK to MPT #827
Changes from 6 commits
2cdfb07
7c11f8c
98925c5
c34bee9
e39a59a
105c335
a3d76f2
8ae80bd
d5f2d8d
fa2307f
c38ea30
8b2a833
c9f508d
0968304
f4d7b0c
32512d0
b5e977c
5f6ad17
8631ca8
5dea362
772df41
75181a7
6120b0a
89218d0
0a63c6b
a438b20
8abcf7c
d6dd008
abcc805
57cf1e4
0f74ddb
0ec7bda
fd31a72
fae4ad8
ffa5e6d
f11e92a
f34a133
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ import ( | |
| "context" | ||
| "math/big" | ||
| "strings" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/cenkalti/backoff/v4" | ||
| "github.com/morph-l2/go-ethereum" | ||
|
|
@@ -27,27 +29,96 @@ const ( | |
| ) | ||
|
|
||
| type RetryableClient struct { | ||
| authClient *authclient.Client | ||
| ethClient *ethclient.Client | ||
| b backoff.BackOff | ||
| logger tmlog.Logger | ||
| legacyAuthClient *authclient.Client | ||
| legacyEthClient *ethclient.Client | ||
| authClient *authclient.Client | ||
| ethClient *ethclient.Client | ||
| mptTime uint64 | ||
| mpt atomic.Bool | ||
| b backoff.BackOff | ||
| logger tmlog.Logger | ||
| } | ||
|
|
||
| // NewRetryableClient make the client retryable | ||
| // Will retry calling the api, if the connection is refused | ||
| func NewRetryableClient(authClient *authclient.Client, ethClient *ethclient.Client, logger tmlog.Logger) *RetryableClient { | ||
| func NewRetryableClient(legacyAuthClient *authclient.Client, legacyEthClient *ethclient.Client, authClient *authclient.Client, ethClient *ethclient.Client, logger tmlog.Logger) *RetryableClient { | ||
| logger = logger.With("module", "retryClient") | ||
| return &RetryableClient{ | ||
| authClient: authClient, | ||
| ethClient: ethClient, | ||
| b: backoff.NewExponentialBackOff(), | ||
| logger: logger, | ||
| legacyAuthClient: legacyAuthClient, | ||
| legacyEthClient: legacyEthClient, | ||
| authClient: authClient, | ||
| ethClient: ethClient, | ||
| mptTime: uint64(time.Now().Add(time.Hour).Unix()), // TODO: make configurable | ||
| b: backoff.NewExponentialBackOff(), | ||
| logger: logger, | ||
| } | ||
| } | ||
|
|
||
| func (rc *RetryableClient) aClient() *authclient.Client { | ||
| if !rc.mpt.Load() { | ||
| return rc.legacyAuthClient | ||
| } | ||
| return rc.authClient | ||
| } | ||
|
|
||
| func (rc *RetryableClient) eClient() *ethclient.Client { | ||
| if !rc.mpt.Load() { | ||
| return rc.legacyEthClient | ||
| } | ||
| return rc.ethClient | ||
| } | ||
|
|
||
| func (rc *RetryableClient) switchClient(ctx context.Context, timeStamp uint64, number uint64) { | ||
| if rc.mpt.Load() || timeStamp <= rc.mptTime { | ||
| return | ||
| } | ||
|
|
||
| rc.logger.Info("MPT switch time reached, MUST wait for MPT node to sync", | ||
| "mpt_time", rc.mptTime, | ||
| "current_time", timeStamp, | ||
| "target_block", number) | ||
|
|
||
| ticker := time.NewTicker(3 * time.Second) | ||
| defer ticker.Stop() | ||
|
|
||
| startTime := time.Now() | ||
| lastLogTime := startTime | ||
|
|
||
| for { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add maxWaitTime here? because this function will block the normal new block process |
||
| remote, err := rc.ethClient.BlockNumber(ctx) | ||
| if err != nil { | ||
| rc.logger.Error("Failed to get MPT block number, retrying...", "error", err) | ||
| <-ticker.C | ||
| continue | ||
| } | ||
|
|
||
| if remote+1 >= number { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we remove the debug code? |
||
| rc.mpt.Store(true) | ||
| rc.logger.Info("Successfully switched to MPT client", | ||
| "remote_block", remote, | ||
| "target_block", number, | ||
| "wait_duration", time.Since(startTime)) | ||
| return | ||
| } | ||
|
|
||
| if time.Since(lastLogTime) >= 10*time.Second { | ||
| rc.logger.Info("Waiting for MPT node to sync...", | ||
| "remote_block", remote, | ||
| "target_block", number, | ||
| "blocks_behind", number-remote-1, | ||
| "wait_duration", time.Since(startTime)) | ||
| lastLogTime = time.Now() | ||
| } | ||
|
|
||
| <-ticker.C | ||
| } | ||
|
Comment on lines
+250
to
293
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical: Blocking loop lacks timeout and context cancellation, risking indefinite hang. The polling loop that waits for the next geth to sync has no exit conditions other than successful synchronization:
This could cause the entire node to hang during an upgrade if the next geth is unavailable. Proposed fix for {
+ select {
+ case <-ctx.Done():
+ rc.logger.Error("Context cancelled while waiting for next geth to sync",
+ "error", ctx.Err(),
+ "wait_duration", time.Since(startTime))
+ return
+ default:
+ }
+
remote, err := rc.nextEthClient.BlockNumber(ctx)
if err != nil {
rc.logger.Error("Failed to get next geth block number",
"error", err,
"hint", "Please ensure next geth is running and accessible")
- <-ticker.C
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ }
continue
}Consider also adding a maximum wait timeout (e.g., configurable or a sensible default like 30 minutes) to prevent indefinite blocking in production. 🤖 Prompt for AI Agents |
||
| } | ||
|
Comment on lines
+224
to
294
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Infinite blocking loop can halt the node permanently. The Additionally:
Consider adding:
🛠️ Suggested approach with timeout func (rc *RetryableClient) switchClient(ctx context.Context, timeStamp uint64, number uint64) {
if rc.switched.Load() {
return
}
if timeStamp < rc.switchTime {
return
}
+ // Add a reasonable timeout for the switch operation
+ switchCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
+ defer cancel()
// ... existing logging ...
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
+ select {
+ case <-switchCtx.Done():
+ rc.logger.Error("Switch timeout exceeded, failing switch operation")
+ return // or panic, depending on desired behavior
+ default:
+ }
+
remote, err := rc.nextEthClient.BlockNumber(ctx)
// ... rest of loop ...
}
}🤖 Prompt for AI Agents |
||
|
|
||
| func (rc *RetryableClient) AssembleL2Block(ctx context.Context, number *big.Int, transactions eth.Transactions) (ret *catalyst.ExecutableL2Data, err error) { | ||
| timestamp := uint64(time.Now().Unix()) | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.authClient.AssembleL2Block(ctx, number, transactions) | ||
| rc.switchClient(ctx, timestamp, number.Uint64()) | ||
| resp, respErr := rc.aClient().AssembleL2Block(ctx, ×tamp, number, transactions) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to AssembleL2Block", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -64,8 +135,9 @@ func (rc *RetryableClient) AssembleL2Block(ctx context.Context, number *big.Int, | |
| } | ||
|
|
||
| func (rc *RetryableClient) ValidateL2Block(ctx context.Context, executableL2Data *catalyst.ExecutableL2Data) (ret bool, err error) { | ||
| rc.switchClient(ctx, executableL2Data.Timestamp, executableL2Data.Number) | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.authClient.ValidateL2Block(ctx, executableL2Data) | ||
| resp, respErr := rc.aClient().ValidateL2Block(ctx, executableL2Data) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to ValidateL2Block", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -82,8 +154,9 @@ func (rc *RetryableClient) ValidateL2Block(ctx context.Context, executableL2Data | |
| } | ||
|
|
||
| func (rc *RetryableClient) NewL2Block(ctx context.Context, executableL2Data *catalyst.ExecutableL2Data, batchHash *common.Hash) (err error) { | ||
| rc.switchClient(ctx, executableL2Data.Timestamp, executableL2Data.Number) | ||
| if retryErr := backoff.Retry(func() error { | ||
| respErr := rc.authClient.NewL2Block(ctx, executableL2Data, batchHash) | ||
| respErr := rc.aClient().NewL2Block(ctx, executableL2Data, batchHash) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to NewL2Block", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -99,8 +172,9 @@ func (rc *RetryableClient) NewL2Block(ctx context.Context, executableL2Data *cat | |
| } | ||
|
|
||
| func (rc *RetryableClient) NewSafeL2Block(ctx context.Context, safeL2Data *catalyst.SafeL2Data) (ret *eth.Header, err error) { | ||
| rc.switchClient(ctx, safeL2Data.Timestamp, safeL2Data.Number) | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.authClient.NewSafeL2Block(ctx, safeL2Data) | ||
| resp, respErr := rc.aClient().NewSafeL2Block(ctx, safeL2Data) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to NewSafeL2Block", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -118,7 +192,7 @@ func (rc *RetryableClient) NewSafeL2Block(ctx context.Context, safeL2Data *catal | |
|
|
||
| func (rc *RetryableClient) CommitBatch(ctx context.Context, batch *eth.RollupBatch, signatures []eth.BatchSignature) (err error) { | ||
| if retryErr := backoff.Retry(func() error { | ||
| respErr := rc.authClient.CommitBatch(ctx, batch, signatures) | ||
| respErr := rc.aClient().CommitBatch(ctx, batch, signatures) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to CommitBatch", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -135,7 +209,7 @@ func (rc *RetryableClient) CommitBatch(ctx context.Context, batch *eth.RollupBat | |
|
|
||
| func (rc *RetryableClient) AppendBlsSignature(ctx context.Context, batchHash common.Hash, signature eth.BatchSignature) (err error) { | ||
| if retryErr := backoff.Retry(func() error { | ||
| respErr := rc.authClient.AppendBlsSignature(ctx, batchHash, signature) | ||
| respErr := rc.aClient().AppendBlsSignature(ctx, batchHash, signature) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to call AppendBlsSignature", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -152,7 +226,7 @@ func (rc *RetryableClient) AppendBlsSignature(ctx context.Context, batchHash com | |
|
|
||
| func (rc *RetryableClient) BlockNumber(ctx context.Context) (ret uint64, err error) { | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.ethClient.BlockNumber(ctx) | ||
| resp, respErr := rc.eClient().BlockNumber(ctx) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to call BlockNumber", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -170,7 +244,7 @@ func (rc *RetryableClient) BlockNumber(ctx context.Context) (ret uint64, err err | |
|
|
||
| func (rc *RetryableClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (ret *eth.Header, err error) { | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.ethClient.HeaderByNumber(ctx, blockNumber) | ||
| resp, respErr := rc.eClient().HeaderByNumber(ctx, blockNumber) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to call BlockNumber", "error", respErr) | ||
|
curryxbo marked this conversation as resolved.
|
||
| if retryableError(respErr) { | ||
|
|
@@ -188,7 +262,7 @@ func (rc *RetryableClient) HeaderByNumber(ctx context.Context, blockNumber *big. | |
|
|
||
| func (rc *RetryableClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) (ret []byte, err error) { | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.ethClient.CallContract(ctx, call, blockNumber) | ||
| resp, respErr := rc.eClient().CallContract(ctx, call, blockNumber) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to call eth_call", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
@@ -206,7 +280,7 @@ func (rc *RetryableClient) CallContract(ctx context.Context, call ethereum.CallM | |
|
|
||
| func (rc *RetryableClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) (ret []byte, err error) { | ||
| if retryErr := backoff.Retry(func() error { | ||
| resp, respErr := rc.ethClient.CodeAt(ctx, contract, blockNumber) | ||
| resp, respErr := rc.eClient().CodeAt(ctx, contract, blockNumber) | ||
| if respErr != nil { | ||
| rc.logger.Info("failed to call eth_getCode", "error", respErr) | ||
| if retryableError(respErr) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make
mptTimeconfigurable instead of hard‑codingnow+1h.Right now
mptTimeis derived fromtime.Now().Add(time.Hour)at construction, which means:Before shipping, this should be driven by config (or passed explicitly into
NewRetryableClient) so that routing behavior is deterministic and restart‑safe, not tied to process start time. The existing TODO hints at this, but it’s important for correctness of the migration window.Also applies to: 43-55
🧩 Analysis chain
🏁 Script executed:
Repository: morph-l2/morph
Length of output: 90
🏁 Script executed:
# Read the entire file to understand the structure and all usages of rc.b cat -n node/types/retryable_client.goRepository: morph-l2/morph
Length of output: 11289
🏁 Script executed:
Repository: morph-l2/morph
Length of output: 3325
🌐 Web query:
cenkalti/backoff v4 ExponentialBackOff thread-safe concurrent💡 Result:
Short answer: No — ExponentialBackOff in cenkalti/backoff v4 is explicitly documented as not thread‑safe for concurrent use. See the package docs and source which state "Note: Implementation is not thread‑safe." [1][2]
If you need concurrent use, either:
Sources:
[1] pkg.go.dev github.com/cenkalti/backoff/v4 (ExponentialBackOff docs).
[2] ExponentialBackOff implementation (v4) in repository/source.
🏁 Script executed:
Repository: morph-l2/morph
Length of output: 2510
🏁 Script executed:
Repository: morph-l2/morph
Length of output: 454
🏁 Script executed:
Repository: morph-l2/morph
Length of output: 8154
🏁 Script executed:
Repository: morph-l2/morph
Length of output: 174
Shared
backoff.BackOffinstance is not thread-safe and unsafe for concurrent use.The single
rc.binstance is shared across all retry calls inAssembleL2Block,ValidateL2Block,NewL2Block,NewSafeL2Block,CommitBatch,AppendBlsSignature,BlockNumber,HeaderByNumber,CallContract, andCodeAtmethods. Sincecenkalti/backoff/v4'sExponentialBackOffis explicitly documented as not thread-safe, andRetryableClientis used concurrently by sequencer, derivation, and executor subsystems, concurrent calls will cause data race conditions on the backoff state.Create a new
ExponentialBackOffper request instead:type RetryableClient struct { - b backoff.BackOff + // remove shared BackOff; create per-call instances instead } func (rc *RetryableClient) AssembleL2Block(...) (..., error) { - if retryErr := backoff.Retry(func() error { ... }, rc.b); retryErr != nil { + b := backoff.NewExponentialBackOff() + if retryErr := backoff.Retry(func() error { ... }, b); retryErr != nil { ... } }Apply the same fix to all other methods using
backoff.Retry.🤖 Prompt for AI Agents