diff --git a/common/version/version.go b/common/version/version.go index df872f409f..592f969612 100644 --- a/common/version/version.go +++ b/common/version/version.go @@ -5,7 +5,7 @@ import ( "runtime/debug" ) -var tag = "v4.5.46" +var tag = "v4.5.47" var commit = func() string { if info, ok := debug.ReadBuildInfo(); ok { diff --git a/rollup/internal/config/relayer.go b/rollup/internal/config/relayer.go index db2e274681..b67deb4dfd 100644 --- a/rollup/internal/config/relayer.go +++ b/rollup/internal/config/relayer.go @@ -7,8 +7,14 @@ import ( // SenderConfig The config for transaction sender type SenderConfig struct { - // The RPC endpoint of the ethereum or scroll public node. + // The RPC endpoint of the ethereum or scroll public node (for backward compatibility). + // If WriteEndpoints is specified, this endpoint will be used only for reading. + // If WriteEndpoints is empty, this endpoint will be used for both reading and writing. Endpoint string `json:"endpoint"` + // The RPC endpoints to send transactions to (optional). + // If specified, transactions will be sent to all these endpoints in parallel. + // If empty, transactions will be sent to the Endpoint. + WriteEndpoints []string `json:"write_endpoints,omitempty"` // The time to trigger check pending txs in sender. CheckPendingTime uint64 `json:"check_pending_time"` // The number of blocks to wait to escalate increase gas price of the transaction. diff --git a/rollup/internal/controller/relayer/relayer_test.go b/rollup/internal/controller/relayer/relayer_test.go index 48a65578cd..322a259aed 100644 --- a/rollup/internal/controller/relayer/relayer_test.go +++ b/rollup/internal/controller/relayer/relayer_test.go @@ -56,6 +56,7 @@ func setupEnv(t *testing.T) { cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetPoSL1EndPoint() assert.NoError(t, err) + cfg.L2Config.RelayerConfig.SenderConfig.WriteEndpoints = []string{cfg.L2Config.RelayerConfig.SenderConfig.Endpoint, cfg.L2Config.RelayerConfig.SenderConfig.Endpoint} cfg.L1Config.RelayerConfig.SenderConfig.Endpoint, err = testApps.GetL2GethEndPoint() assert.NoError(t, err) diff --git a/rollup/internal/controller/sender/sender.go b/rollup/internal/controller/sender/sender.go index 8be721b097..10ea8e8305 100644 --- a/rollup/internal/controller/sender/sender.go +++ b/rollup/internal/controller/sender/sender.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "time" "github.com/holiman/uint256" @@ -67,7 +68,8 @@ type FeeData struct { type Sender struct { config *config.SenderConfig gethClient *gethclient.Client - client *ethclient.Client // The client to retrieve on chain data or send transaction. + client *ethclient.Client // The client to retrieve on chain data (read-only) + writeClients []*ethclient.Client // The clients to send transactions to (write operations) transactionSigner *TransactionSigner chainID *big.Int // The chain id of the endpoint ctx context.Context @@ -90,9 +92,10 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c return nil, fmt.Errorf("invalid params, EscalateMultipleNum; %v, EscalateMultipleDen: %v", config.EscalateMultipleNum, config.EscalateMultipleDen) } + // Initialize read client rpcClient, err := rpc.Dial(config.Endpoint) if err != nil { - return nil, fmt.Errorf("failed to dial eth client, err: %w", err) + return nil, fmt.Errorf("failed to dial read client, err: %w", err) } client := ethclient.NewClient(rpcClient) @@ -105,12 +108,42 @@ func NewSender(ctx context.Context, config *config.SenderConfig, signerConfig *c return nil, fmt.Errorf("failed to create transaction signer, err: %w", err) } + // Initialize write clients + var writeClients []*ethclient.Client + if len(config.WriteEndpoints) > 0 { + // Use specified write endpoints + for i, endpoint := range config.WriteEndpoints { + writeRpcClient, err := rpc.Dial(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to dial write client %d (endpoint: %s), err: %w", i, endpoint, err) + } + writeClient := ethclient.NewClient(writeRpcClient) + + // Verify the write client is connected to the same chain + writeChainID, err := writeClient.ChainID(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get chain ID from write client %d (endpoint: %s), err: %w", i, endpoint, err) + } + if writeChainID.Cmp(chainID) != 0 { + return nil, fmt.Errorf("write client %d (endpoint: %s) has different chain ID %s, expected %s", i, endpoint, writeChainID.String(), chainID.String()) + } + + writeClients = append(writeClients, writeClient) + } + log.Info("initialized sender with multiple write clients", "service", service, "name", name, "readEndpoint", config.Endpoint, "writeEndpoints", config.WriteEndpoints) + } else { + // Use read client for writing (backward compatibility) + writeClients = append(writeClients, client) + log.Info("initialized sender with single client", "service", service, "name", name, "endpoint", config.Endpoint) + } + // Create sender instance first and then initialize nonce sender := &Sender{ ctx: ctx, config: config, gethClient: gethclient.New(rpcClient), client: client, + writeClients: writeClients, chainID: chainID, transactionSigner: transactionSigner, db: db, @@ -169,6 +202,82 @@ func (s *Sender) getFeeData(target *common.Address, data []byte, sidecar *gethTy } } +// sendTransactionToMultipleClients sends a transaction to all write clients in parallel +// and returns success if at least one client succeeds +func (s *Sender) sendTransactionToMultipleClients(signedTx *gethTypes.Transaction) error { + ctx, cancel := context.WithTimeout(s.ctx, 15*time.Second) + defer cancel() + + if len(s.writeClients) == 1 { + // Single client - use direct approach + return s.writeClients[0].SendTransaction(ctx, signedTx) + } + + // Multiple clients - send in parallel + type result struct { + endpoint string + err error + } + + resultChan := make(chan result, len(s.writeClients)) + var wg sync.WaitGroup + + // Send transaction to all write clients in parallel + for i, client := range s.writeClients { + wg.Add(1) + // Determine endpoint URL for this client + endpoint := s.config.WriteEndpoints[i] + + go func(ep string, writeClient *ethclient.Client) { + defer wg.Done() + err := writeClient.SendTransaction(ctx, signedTx) + resultChan <- result{endpoint: ep, err: err} + }(endpoint, client) + } + + // Wait for all goroutines to finish + go func() { + wg.Wait() + close(resultChan) + }() + + // Collect results + var errs []error + for res := range resultChan { + if res.err != nil { + errs = append(errs, fmt.Errorf("%s: %w", res.endpoint, res.err)) + log.Warn("failed to send transaction to write client", + "endpoint", res.endpoint, + "txHash", signedTx.Hash().Hex(), + "nonce", signedTx.Nonce(), + "from", s.transactionSigner.GetAddr().String(), + "error", res.err) + } else { + log.Info("successfully sent transaction to write client", + "endpoint", res.endpoint, + "txHash", signedTx.Hash().Hex(), + "nonce", signedTx.Nonce(), + "from", s.transactionSigner.GetAddr().String()) + } + } + + // Check if at least one client succeeded + if len(errs) < len(s.writeClients) { + successCount := len(s.writeClients) - len(errs) + if len(errs) > 0 { + log.Info("transaction partially succeeded", + "txHash", signedTx.Hash().Hex(), + "successCount", successCount, + "totalClients", len(s.writeClients), + "failures", errors.Join(errs...)) + } + return nil + } + + // All clients failed + return fmt.Errorf("failed to send transaction to all %d write clients: %w", len(s.writeClients), errors.Join(errs...)) +} + // SendTransaction send a signed L2tL1 transaction. func (s *Sender) SendTransaction(contextID string, target *common.Address, data []byte, blobs []*kzg4844.Blob) (common.Hash, uint64, error) { s.metrics.sendTransactionTotal.WithLabelValues(s.service, s.name).Inc() @@ -230,7 +339,7 @@ func (s *Sender) SendTransaction(contextID string, target *common.Address, data return common.Hash{}, 0, fmt.Errorf("failed to insert transaction, err: %w", err) } - if err := s.client.SendTransaction(s.ctx, signedTx); err != nil { + if err := s.sendTransactionToMultipleClients(signedTx); err != nil { // Delete the transaction from the pending transaction table if it fails to send. if updateErr := s.pendingTransactionOrm.DeleteTransactionByTxHash(s.ctx, signedTx.Hash()); updateErr != nil { log.Error("failed to delete transaction", "tx hash", signedTx.Hash().String(), "from", s.transactionSigner.GetAddr().String(), "nonce", signedTx.Nonce(), "err", updateErr) @@ -645,7 +754,7 @@ func (s *Sender) checkPendingTransaction() { return } - if err := s.client.SendTransaction(s.ctx, newSignedTx); err != nil { + if err := s.sendTransactionToMultipleClients(newSignedTx); err != nil { if strings.Contains(err.Error(), "nonce too low") { // When we receive a 'nonce too low' error but cannot find the transaction receipt, it indicates another transaction with this nonce has already been processed, so this transaction will never be mined and should be marked as failed. log.Warn("nonce too low detected, marking all non-confirmed transactions with same nonce as failed", "nonce", originalTx.Nonce(), "address", s.transactionSigner.GetAddr().Hex(), "txHash", originalTx.Hash().Hex(), "newTxHash", newSignedTx.Hash().Hex(), "err", err)