diff --git a/CHANGELOG.md b/CHANGELOG.md index cfd8ab23a7..f8d6bd3e91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Implement forced inclusion and based sequencing ([#2797](https://github.com/evstack/ev-node/pull/2797)) This changes requires to add a `da_epoch_forced_inclusion` field in `genesis.json` file. To enable this feature, set the force inclusion namespace in the `evnode.yaml`. +- Added `post-tx` command and force inclusion server to submit transaction directly to the DA layer. ([#2888](https://github.com/evstack/ev-node/pull/2888)) + Additionally, modified the core package to support marking transactions as forced included transactions. + The execution client ought to perform basic validation on those transactions as they have skipped the execution client's mempool. ### Changed diff --git a/apps/evm/README.md b/apps/evm/README.md index 614039bed3..c52a0c9415 100644 --- a/apps/evm/README.md +++ b/apps/evm/README.md @@ -103,3 +103,69 @@ If you'd ever like to restart a fresh node, make sure to remove the originally c ```bash rm -rf ~/.evm-full-node ``` + +## Force Inclusion API + +The EVM app includes a Force Inclusion API server that exposes an Ethereum-compatible JSON-RPC endpoint for submitting transactions directly to the DA layer for force inclusion. + +When enabling this server, the node operator will be paying for the gas costs of the transactions submitted through the API on the DA layer. The application costs are still paid by the transaction signer. + +### Enabling the Force Inclusion API + +To enable the Force Inclusion API server, add the following flag when starting the node: + +```bash +./evm start \ + --force-inclusion-server="127.0.0.1:8547" \ + ... other flags ... +``` + +### Configuration Flag + +- `--force-inclusion-server`: Address for the force inclusion API server (e.g. `127.0.0.1:8547`). If set, enables the server for direct DA submission + +### Usage + +Once enabled, the server exposes a standard Ethereum JSON-RPC endpoint that accepts `eth_sendRawTransaction` calls: + +```bash +# Send a raw transaction for force inclusion +curl -X POST http://127.0.0.1:8547 \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_sendRawTransaction", + "params": ["0x02f873..."] + }' +``` + +The transaction will be submitted directly to the DA layer in the force inclusion namespace, bypassing the normal mempool. The response returns a pseudo-transaction hash based on the DA height: + +```json +{ + "jsonrpc": "2.0", + "id": 1, + "result": "0x0000000000000000000000000000000000000000000000000000000000000064" +} +``` + +### Using with Spamoor + +You can use this endpoint with [spamoor](https://github.com/ethpandaops/spamoor) for force inclusion testing: + +```bash +spamoor \ + --rpc-url http://127.0.0.1:8547 \ + --private-key \ + ... other spamoor flags ... +``` + +### Force Inclusion Timing + +Transactions submitted to the Force Inclusion API are included in the chain at specific DA heights based on the `da_epoch_forced_inclusion` configuration in `genesis.json`. The API logs will show when the transaction will be force included: + +``` +INF transaction successfully submitted to DA layer da_height=100 +INF transaction will be force included blocks_until_inclusion=8 inclusion_at_height=110 +``` diff --git a/apps/evm/cmd/post_tx_cmd.go b/apps/evm/cmd/post_tx_cmd.go new file mode 100644 index 0000000000..7b03a0926a --- /dev/null +++ b/apps/evm/cmd/post_tx_cmd.go @@ -0,0 +1,233 @@ +package cmd + +import ( + "encoding/hex" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/spf13/cobra" + + evblock "github.com/evstack/ev-node/block" + "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/da/jsonrpc" + rollcmd "github.com/evstack/ev-node/pkg/cmd" + rollconf "github.com/evstack/ev-node/pkg/config" + genesispkg "github.com/evstack/ev-node/pkg/genesis" + seqcommon "github.com/evstack/ev-node/sequencers/common" + "github.com/evstack/ev-node/types" +) + +const ( + flagNamespace = "namespace" + flagGasPrice = "gas-price" +) + +// PostTxCmd returns a command to post a signed Ethereum transaction to the DA layer +func PostTxCmd() *cobra.Command { + cobraCmd := &cobra.Command{ + Use: "post-tx", + Short: "Post a signed Ethereum transaction to the DA layer", + Long: `Post a signed Ethereum transaction to the DA layer using the Evolve configuration. + +This command submits a signed Ethereum transaction tzo the configured DA layer for forced inclusion. +The transaction is provided as an argument, which accepts either: + 1. A hex-encoded signed transaction (with or without 0x prefix) + 2. A path to a file containing the hex-encoded transaction + 3. A JSON object with a "raw" field containing the hex-encoded transaction + +The command automatically detects the input format. + +Examples: + # From hex string + evm post-tx 0x02f873... + + # From file + evm post-tx tx.txt + + # From JSON + evm post-tx '{"raw":"0x02f873..."}' +`, + Args: cobra.ExactArgs(1), + RunE: postTxRunE, + } + + // Add evolve config flags + rollconf.AddFlags(cobraCmd) + + // Add command-specific flags + cobraCmd.Flags().String(flagNamespace, "", "DA namespace ID (if not provided, uses config namespace)") + cobraCmd.Flags().Float64(flagGasPrice, -1, "Gas price for DA submission (if not provided, uses config gas price)") + + return cobraCmd +} + +// postTxRunE executes the post-tx command +func postTxRunE(cmd *cobra.Command, args []string) error { + nodeConfig, err := rollcmd.ParseConfig(cmd) + if err != nil { + return err + } + + logger := rollcmd.SetupLogger(nodeConfig.Log) + + txInput := args[0] + if txInput == "" { + return fmt.Errorf("transaction cannot be empty") + } + + var txData []byte + if _, err := os.Stat(txInput); err == nil { + // Input is a file path + txData, err = decodeTxFromFile(txInput) + if err != nil { + return fmt.Errorf("failed to decode transaction from file: %w", err) + } + } else { + // Input is a JSON string + txData, err = decodeTxFromJSON(txInput) + if err != nil { + return fmt.Errorf("failed to decode transaction from JSON: %w", err) + } + } + + if len(txData) == 0 { + return fmt.Errorf("transaction data cannot be empty") + } + + // Get namespace (use flag if provided, otherwise use config) + namespace, _ := cmd.Flags().GetString(flagNamespace) + if namespace == "" { + namespace = nodeConfig.DA.GetForcedInclusionNamespace() + } + + if namespace == "" { + return fmt.Errorf("forced inclusionnamespace cannot be empty") + } + + namespaceBz := da.NamespaceFromString(namespace).Bytes() + + // Get gas price (use flag if provided, otherwise use config) + gasPrice, err := cmd.Flags().GetFloat64(flagGasPrice) + if err != nil { + return fmt.Errorf("failed to get gas-price flag: %w", err) + } + + logger.Info().Str("namespace", namespace).Float64("gas_price", gasPrice).Int("tx_size", len(txData)).Msg("posting transaction to DA layer") + + daClient, err := jsonrpc.NewClient( + cmd.Context(), + logger, + nodeConfig.DA.Address, + nodeConfig.DA.AuthToken, + seqcommon.AbsoluteMaxBlobSize, + ) + if err != nil { + return fmt.Errorf("failed to create DA client: %w", err) + } + + // Submit transaction to DA layer + logger.Info().Msg("submitting transaction to DA layer...") + + blobs := [][]byte{txData} + options := []byte(nodeConfig.DA.SubmitOptions) + + dac := evblock.NewDAClient(&daClient.DA, nodeConfig, logger) + result := dac.Submit(cmd.Context(), blobs, gasPrice, namespaceBz, options) + + // Check result + switch result.Code { + case da.StatusSuccess: + logger.Info().Msg("transaction successfully submitted to DA layer") + cmd.Printf("\n✓ Transaction posted successfully\n\n") + cmd.Printf("Namespace: %s\n", namespace) + cmd.Printf("DA Height: %d\n", result.Height) + cmd.Printf("Data Size: %d bytes\n", len(txData)) + + genesisPath := filepath.Join(filepath.Dir(nodeConfig.ConfigPath()), "genesis.json") + genesis, err := genesispkg.LoadGenesis(genesisPath) + if err != nil { + return fmt.Errorf("failed to load genesis for calculating inclusion time estimate: %w", err) + } + + _, epochEnd, _ := types.CalculateEpochBoundaries(result.Height, genesis.DAStartHeight, genesis.DAEpochForcedInclusion) + cmd.Printf( + "DA Blocks until inclusion: %d (at DA height %d)\n", + epochEnd-(result.Height+1), + epochEnd+1, + ) + + cmd.Printf("\n") + return nil + + case da.StatusTooBig: + return fmt.Errorf("transaction too large for DA layer: %s", result.Message) + + case da.StatusNotIncludedInBlock: + return fmt.Errorf("transaction not included in DA block: %s", result.Message) + + case da.StatusAlreadyInMempool: + cmd.Printf("⚠ Transaction already in mempool\n") + if result.Height > 0 { + cmd.Printf(" DA Height: %d\n", result.Height) + } + return nil + + case da.StatusContextCanceled: + return fmt.Errorf("submission canceled: %s", result.Message) + + default: + return fmt.Errorf("DA submission failed (code: %d): %s", result.Code, result.Message) + } +} + +// decodeTxFromFile reads an Ethereum transaction from a file and decodes it to bytes +func decodeTxFromFile(filePath string) ([]byte, error) { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("reading file: %w", err) + } + + return decodeTxFromJSON(string(data)) +} + +// decodeTxFromJSON decodes an Ethereum transaction from various formats to bytes +func decodeTxFromJSON(input string) ([]byte, error) { + input = strings.TrimSpace(input) + + // Try to decode as JSON with "raw" field + var txJSON map[string]any + if err := json.Unmarshal([]byte(input), &txJSON); err == nil { + if rawTx, ok := txJSON["raw"].(string); ok { + return decodeHexTx(rawTx) + } + return nil, fmt.Errorf("JSON must contain 'raw' field with hex-encoded transaction") + } + + // Try to decode as hex string directly + return decodeHexTx(input) +} + +// decodeHexTx decodes a hex-encoded Ethereum transaction +func decodeHexTx(hexStr string) ([]byte, error) { + hexStr = strings.TrimSpace(hexStr) + + // Remove 0x prefix if present + if strings.HasPrefix(hexStr, "0x") || strings.HasPrefix(hexStr, "0X") { + hexStr = hexStr[2:] + } + + // Decode hex string to bytes + txBytes, err := hex.DecodeString(hexStr) + if err != nil { + return nil, fmt.Errorf("decoding hex transaction: %w", err) + } + + if len(txBytes) == 0 { + return nil, fmt.Errorf("decoded transaction is empty") + } + + return txBytes, nil +} diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index b22599a705..f2ebf62ab3 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-datastore" @@ -29,9 +30,14 @@ import ( "github.com/evstack/ev-node/sequencers/based" seqcommon "github.com/evstack/ev-node/sequencers/common" "github.com/evstack/ev-node/sequencers/single" + + "github.com/evstack/ev-node/apps/evm/server" ) -const evmDbName = "evm-single" +const ( + flagForceInclusionServer = "force-inclusion-server" + evmDbName = "evm-single" +) var RunCmd = &cobra.Command{ Use: "start", @@ -96,6 +102,44 @@ var RunCmd = &cobra.Command{ return err } + // Start force inclusion API server if address is provided + forceInclusionAddr, err := cmd.Flags().GetString(flagForceInclusionServer) + if err != nil { + return fmt.Errorf("failed to get '%s' flag: %w", flagForceInclusionServer, err) + } + + if forceInclusionAddr != "" { + ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL) + if err != nil { + return fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err) + } + + forceInclusionServer, err := server.NewForceInclusionServer( + forceInclusionAddr, + &daJrpc.DA, + nodeConfig, + genesis, + logger, + ethURL, + ) + if err != nil { + return fmt.Errorf("failed to create force inclusion server: %w", err) + } + + if err := forceInclusionServer.Start(cmd.Context()); err != nil { + return fmt.Errorf("failed to start force inclusion API server: %w", err) + } + + // Ensure server is stopped when node stops + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := forceInclusionServer.Stop(shutdownCtx); err != nil { + logger.Error().Err(err).Msg("failed to stop force inclusion API server") + } + }() + } + return rollcmd.StartNode(logger, cmd, executor, sequencer, &daJrpc.DA, p2pClient, datastore, nodeConfig, genesis, node.NodeOptions{}) }, } @@ -213,4 +257,5 @@ func addFlags(cmd *cobra.Command) { cmd.Flags().String(evm.FlagEvmJWTSecretFile, "", "Path to file containing the JWT secret for authentication") cmd.Flags().String(evm.FlagEvmGenesisHash, "", "Hash of the genesis block") cmd.Flags().String(evm.FlagEvmFeeRecipient, "", "Address that will receive transaction fees") + cmd.Flags().String(flagForceInclusionServer, "", "Address for force inclusion API server (e.g. 127.0.0.1:8547). If set, enables the server for direct DA submission") } diff --git a/apps/evm/go.mod b/apps/evm/go.mod index 9dfbf8adf7..2db1225727 100644 --- a/apps/evm/go.mod +++ b/apps/evm/go.mod @@ -8,6 +8,7 @@ replace ( github.com/evstack/ev-node => ../../ github.com/evstack/ev-node/core => ../../core github.com/evstack/ev-node/da => ../../da + github.com/evstack/ev-node/execution/evm => ../../execution/evm ) require ( @@ -20,6 +21,7 @@ require ( github.com/ipfs/go-datastore v0.9.0 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.1 + gotest.tools/v3 v3.5.2 ) require ( @@ -65,6 +67,7 @@ require ( github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/golang/snappy v1.0.0 // indirect github.com/google/flatbuffers v24.12.23+incompatible // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect diff --git a/apps/evm/go.sum b/apps/evm/go.sum index 75e0c541b7..47e9003c79 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -105,8 +105,6 @@ github.com/ethereum/go-ethereum v1.16.7 h1:qeM4TvbrWK0UC0tgkZ7NiRsmBGwsjqc64BHo2 github.com/ethereum/go-ethereum v1.16.7/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node/execution/evm v1.0.0-beta.3 h1:xo0mZz3CJtntP1RPLFDBubBKpNkqStImt9H9N0xysj8= -github.com/evstack/ev-node/execution/evm v1.0.0-beta.3/go.mod h1:yazCKZaVczYwizfHYSQ4KIYqW0d42M7q7e9AxuSXV3s= github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY= github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= github.com/filecoin-project/go-clock v0.1.0 h1:SFbYIM75M8NnFm1yMHhN9Ahy3W5bEZV9gd6MPfXbKVU= diff --git a/apps/evm/server/force_inclusion.go b/apps/evm/server/force_inclusion.go new file mode 100644 index 0000000000..f0f7443489 --- /dev/null +++ b/apps/evm/server/force_inclusion.go @@ -0,0 +1,350 @@ +package server + +import ( + "context" + "encoding/binary" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/rs/zerolog" + + "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/types" +) + +const ( + defaultForceInclusionAddr = "127.0.0.1:8547" + defaultReadTimeout = 30 * time.Second + defaultWriteTimeout = 30 * time.Second +) + +// ForceInclusionServer provides an Ethereum-compatible JSON-RPC endpoint +// that accepts transactions and submits them directly to the DA layer for force inclusion +type ForceInclusionServer struct { + server *http.Server + daClient da.DA + config config.Config + genesis genesis.Genesis + logger zerolog.Logger + namespace []byte + executionRPCURL string + httpClient *http.Client +} + +// NewForceInclusionServer creates a new force inclusion server +func NewForceInclusionServer( + addr string, + daClient da.DA, + cfg config.Config, + gen genesis.Genesis, + logger zerolog.Logger, + executionRPCURL string, +) (*ForceInclusionServer, error) { + if addr == "" { + addr = defaultForceInclusionAddr + } + + if len(cfg.DA.GetForcedInclusionNamespace()) == 0 { + return nil, errors.New("forced inclusion namespace is empty") + } + + namespace := da.NamespaceFromString(cfg.DA.GetForcedInclusionNamespace()).Bytes() + + s := &ForceInclusionServer{ + daClient: daClient, + config: cfg, + genesis: gen, + logger: logger.With().Str("module", "force_inclusion_api").Logger(), + namespace: namespace, + executionRPCURL: executionRPCURL, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } + + mux := http.NewServeMux() + mux.HandleFunc("/", s.handleJSONRPC) + + s.server = &http.Server{ + Addr: addr, + Handler: mux, + ReadTimeout: defaultReadTimeout, + WriteTimeout: defaultWriteTimeout, + } + + return s, nil +} + +// Start starts the HTTP server +func (s *ForceInclusionServer) Start(ctx context.Context) error { + s.logger.Info(). + Str("address", s.server.Addr). + Msg("starting force inclusion API server") + + errChan := make(chan error, 1) + go func() { + if err := s.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + errChan <- err + } + }() + + select { + case err := <-errChan: + return fmt.Errorf("failed to start force inclusion server: %w", err) + case <-time.After(100 * time.Millisecond): + return nil + } +} + +// Stop gracefully stops the HTTP server +func (s *ForceInclusionServer) Stop(ctx context.Context) error { + s.logger.Info().Msg("stopping force inclusion API server") + return s.server.Shutdown(ctx) +} + +// JSONRPCRequest represents a JSON-RPC 2.0 request +type JSONRPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +// JSONRPCResponse represents a JSON-RPC 2.0 response +type JSONRPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID interface{} `json:"id"` + Result interface{} `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` +} + +// RPCError represents a JSON-RPC error +type RPCError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// Standard JSON-RPC error codes +const ( + ParseError = -32700 + InvalidRequest = -32600 + MethodNotFound = -32601 + InvalidParams = -32602 + InternalError = -32603 +) + +// handleJSONRPC handles JSON-RPC requests +func (s *ForceInclusionServer) handleJSONRPC(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + s.writeError(w, nil, MethodNotFound, "method not allowed") + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + s.writeError(w, nil, ParseError, "failed to read request body") + return + } + defer r.Body.Close() + + var req JSONRPCRequest + if err := json.Unmarshal(body, &req); err != nil { + s.writeError(w, nil, ParseError, "invalid JSON") + return + } + + s.logger.Debug(). + Str("method", req.Method). + Interface("id", req.ID). + Msg("received JSON-RPC request") + + switch req.Method { + case "eth_sendRawTransaction": + s.handleSendRawTransaction(w, &req) + default: + s.proxyToExecutionRPC(w, &req, body) + } +} + +// handleChainID handles eth_chainId requests +func (s *ForceInclusionServer) handleChainID(w http.ResponseWriter, req *JSONRPCRequest) { + // Convert chain ID string to integer + chainIDInt, err := strconv.ParseUint(s.genesis.ChainID, 10, 64) + if err != nil { + s.writeError(w, req.ID, InternalError, fmt.Sprintf("invalid chain ID: %v", err)) + return + } + // Return the chain ID as a hex string prefixed with 0x + chainID := fmt.Sprintf("0x%x", chainIDInt) + s.writeSuccess(w, req.ID, chainID) +} + +// proxyToExecutionRPC forwards unknown RPC methods to the execution RPC endpoint +func (s *ForceInclusionServer) proxyToExecutionRPC(w http.ResponseWriter, req *JSONRPCRequest, body []byte) { + if s.executionRPCURL == "" { + s.writeError(w, req.ID, MethodNotFound, fmt.Sprintf("method %s not found", req.Method)) + return + } + + s.logger.Debug(). + Str("method", req.Method). + Str("execution_rpc_url", s.executionRPCURL). + Msg("proxying request to execution RPC") + + proxyReq, err := http.NewRequest(http.MethodPost, s.executionRPCURL, strings.NewReader(string(body))) + if err != nil { + s.writeError(w, req.ID, InternalError, fmt.Sprintf("failed to create proxy request: %v", err)) + return + } + + proxyReq.Header.Set("Content-Type", "application/json") + + resp, err := s.httpClient.Do(proxyReq) + if err != nil { + s.writeError(w, req.ID, InternalError, fmt.Sprintf("failed to proxy request: %v", err)) + return + } + defer resp.Body.Close() + + respBody, err := io.ReadAll(resp.Body) + if err != nil { + s.writeError(w, req.ID, InternalError, fmt.Sprintf("failed to read proxy response: %v", err)) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(resp.StatusCode) + if _, err := w.Write(respBody); err != nil { + s.logger.Error().Err(err).Msg("failed to write proxy response") + } +} + +// handleSendRawTransaction handles eth_sendRawTransaction requests +func (s *ForceInclusionServer) handleSendRawTransaction(w http.ResponseWriter, req *JSONRPCRequest) { + var params []string + if err := json.Unmarshal(req.Params, ¶ms); err != nil || len(params) != 1 { + s.writeError(w, req.ID, InvalidParams, "invalid params: expected [\"0x...\"]") + return + } + + txHex := params[0] + txData, err := s.decodeHexTx(txHex) + if err != nil { + s.writeError(w, req.ID, InvalidParams, fmt.Sprintf("invalid transaction hex: %v", err)) + return + } + + if len(txData) == 0 { + s.writeError(w, req.ID, InvalidParams, "transaction data cannot be empty") + return + } + + s.logger.Info(). + Int("tx_size", len(txData)). + Msg("submitting transaction to DA for force inclusion") + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + blobs := [][]byte{txData} + options := []byte(s.config.DA.SubmitOptions) + gasPrice := -1.0 // auto gas price + + ids, err := s.daClient.SubmitWithOptions(ctx, blobs, gasPrice, s.namespace, options) + if err != nil { + s.writeError(w, req.ID, InternalError, fmt.Sprintf("failed to submit to DA: %v", err)) + return + } + + if len(ids) == 0 { + s.writeError(w, req.ID, InternalError, "no DA IDs returned") + return + } + + // Extract height from the first ID + // IDs are structured with height in the first 8 bytes (little-endian uint64) + if len(ids[0]) < 8 { + s.writeError(w, req.ID, InternalError, "invalid DA ID format") + return + } + daHeight := binary.LittleEndian.Uint64(ids[0][:8]) + + s.logger.Info(). + Uint64("da_height", daHeight). + Msg("transaction successfully submitted to DA layer") + + _, epochEnd, _ := types.CalculateEpochBoundaries( + daHeight, + s.genesis.DAStartHeight, + s.genesis.DAEpochForcedInclusion, + ) + blocksUntilInclusion := epochEnd - (daHeight + 1) + + s.logger.Info(). + Uint64("blocks_until_inclusion", blocksUntilInclusion). + Uint64("inclusion_at_height", epochEnd+1). + Msg("transaction will be force included") + + // Return a transaction hash-like response + // We use the DA height as part of the response since we don't have the actual tx hash yet + txHash := fmt.Sprintf("0x%064x", daHeight) + s.writeSuccess(w, req.ID, txHash) +} + +// decodeHexTx decodes a hex-encoded Ethereum transaction +func (s *ForceInclusionServer) decodeHexTx(hexStr string) ([]byte, error) { + hexStr = strings.TrimSpace(hexStr) + + if !strings.HasPrefix(hexStr, "0x") && !strings.HasPrefix(hexStr, "0X") { + return nil, fmt.Errorf("hex string must start with 0x") + } + + hexStr = hexStr[2:] + + txBytes, err := hex.DecodeString(hexStr) + if err != nil { + return nil, fmt.Errorf("invalid hex: %w", err) + } + + return txBytes, nil +} + +// writeSuccess writes a successful JSON-RPC response +func (s *ForceInclusionServer) writeSuccess(w http.ResponseWriter, id interface{}, result interface{}) { + resp := JSONRPCResponse{ + JSONRPC: "2.0", + ID: id, + Result: result, + } + s.writeResponse(w, resp) +} + +// writeError writes an error JSON-RPC response +func (s *ForceInclusionServer) writeError(w http.ResponseWriter, id interface{}, code int, message string) { + resp := JSONRPCResponse{ + JSONRPC: "2.0", + ID: id, + Error: &RPCError{ + Code: code, + Message: message, + }, + } + s.writeResponse(w, resp) +} + +// writeResponse writes a JSON-RPC response +func (s *ForceInclusionServer) writeResponse(w http.ResponseWriter, resp JSONRPCResponse) { + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + s.logger.Error().Err(err).Msg("failed to write response") + } +} diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go new file mode 100644 index 0000000000..36c11492df --- /dev/null +++ b/apps/evm/server/force_inclusion_test.go @@ -0,0 +1,480 @@ +package server + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/rs/zerolog" + "gotest.tools/v3/assert" +) + +// mockDA implements da.DA interface for testing +type mockDA struct { + submitFunc func(ctx context.Context, blobs []da.Blob, gasPrice float64, namespace []byte, options []byte) ([]da.ID, error) +} + +func (m *mockDA) Submit(ctx context.Context, blobs []da.Blob, gasPrice float64, namespace []byte) ([]da.ID, error) { + return m.SubmitWithOptions(ctx, blobs, gasPrice, namespace, nil) +} + +func (m *mockDA) SubmitWithOptions(ctx context.Context, blobs []da.Blob, gasPrice float64, namespace []byte, options []byte) ([]da.ID, error) { + if m.submitFunc != nil { + return m.submitFunc(ctx, blobs, gasPrice, namespace, options) + } + return nil, nil +} + +func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.Blob, error) { + return nil, nil +} + +func (m *mockDA) GetIDs(ctx context.Context, height uint64, namespace []byte) (*da.GetIDsResult, error) { + return nil, nil +} + +func (m *mockDA) GetProofs(ctx context.Context, ids []da.ID, namespace []byte) ([]da.Proof, error) { + return nil, nil +} + +func (m *mockDA) Commit(ctx context.Context, blobs []da.Blob, namespace []byte) ([]da.Commitment, error) { + return nil, nil +} + +func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) { + return nil, nil +} + +func (m *mockDA) MaxBlobSize(ctx context.Context) (uint64, error) { + return 2 * 1024 * 1024, nil +} + +// createTestID creates a test DA ID with the given height +func createTestID(height uint64) da.ID { + id := make([]byte, 8) + binary.LittleEndian.PutUint64(id, height) + return id +} + +func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) { + testHeight := uint64(100) + + mockDAClient := &mockDA{ + submitFunc: func(ctx context.Context, blobs []da.Blob, gasPrice float64, namespace []byte, options []byte) ([]da.ID, error) { + assert.Equal(t, 1, len(blobs)) + return []da.ID{createTestID(testHeight)}, nil + }, + } + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{ + DAStartHeight: 50, + DAEpochForcedInclusion: 10, + } + + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + // Create test request + reqBody := JSONRPCRequest{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_sendRawTransaction", + Params: json.RawMessage(`["0x02f873010185012a05f20085012a05f2008252089400000000000000000000000000000000000000008080c080a0abcd1234567890abcdef1234567890abcdef1234567890abcdef1234567890aba0dcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210fe"]`), + } + + reqJSON, err := json.Marshal(reqBody) + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqJSON)) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Equal(t, "2.0", jsonResp.JSONRPC) + assert.Assert(t, jsonResp.Error == nil) + assert.Assert(t, jsonResp.Result != nil) +} + +func TestForceInclusionServer_handleSendRawTransaction_InvalidParams(t *testing.T) { + mockDAClient := &mockDA{} + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + tests := []struct { + name string + params json.RawMessage + }{ + { + name: "empty params", + params: json.RawMessage(`[]`), + }, + { + name: "too many params", + params: json.RawMessage(`["0x123", "0x456"]`), + }, + { + name: "invalid hex", + params: json.RawMessage(`["not-hex"]`), + }, + { + name: "no 0x prefix", + params: json.RawMessage(`["123456"]`), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reqBody := JSONRPCRequest{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_sendRawTransaction", + Params: tt.params, + } + + reqJSON, err := json.Marshal(reqBody) + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqJSON)) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Assert(t, jsonResp.Error != nil) + assert.Equal(t, InvalidParams, jsonResp.Error.Code) + }) + } +} + +func TestForceInclusionServer_handleSendRawTransaction_DAError(t *testing.T) { + mockDAClient := &mockDA{ + submitFunc: func(ctx context.Context, blobs []da.Blob, gasPrice float64, namespace []byte, options []byte) ([]da.ID, error) { + return nil, da.ErrBlobSizeOverLimit + }, + } + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + reqBody := JSONRPCRequest{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_sendRawTransaction", + Params: json.RawMessage(`["0x02f873"]`), + } + + reqJSON, err := json.Marshal(reqBody) + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqJSON)) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Assert(t, jsonResp.Error != nil) + assert.Equal(t, InternalError, jsonResp.Error.Code) +} + +func TestForceInclusionServer_handleJSONRPC_MethodNotFound(t *testing.T) { + mockDAClient := &mockDA{} + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + reqBody := JSONRPCRequest{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_getBalance", + Params: json.RawMessage(`[]`), + } + + reqJSON, err := json.Marshal(reqBody) + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqJSON)) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Assert(t, jsonResp.Error != nil) + assert.Equal(t, MethodNotFound, jsonResp.Error.Code) +} + +func TestForceInclusionServer_handleJSONRPC_InvalidJSON(t *testing.T) { + mockDAClient := &mockDA{} + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader([]byte("invalid json"))) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Assert(t, jsonResp.Error != nil) + assert.Equal(t, ParseError, jsonResp.Error.Code) +} + +func TestForceInclusionServer_StartStop(t *testing.T) { + mockDAClient := &mockDA{} + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("127.0.0.1:0", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + ctx := context.Background() + err = server.Start(ctx) + assert.NilError(t, err) + + // Give server time to start + time.Sleep(200 * time.Millisecond) + + // Stop the server + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = server.Stop(stopCtx) + assert.NilError(t, err) +} + +func TestDecodeHexTx(t *testing.T) { + mockDAClient := &mockDA{} + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + tests := []struct { + name string + input string + expectError bool + }{ + { + name: "valid hex with 0x prefix", + input: "0x1234567890abcdef", + expectError: false, + }, + { + name: "valid hex with 0X prefix", + input: "0X1234567890ABCDEF", + expectError: false, + }, + { + name: "no 0x prefix", + input: "1234567890abcdef", + expectError: true, + }, + { + name: "invalid hex characters", + input: "0x123xyz", + expectError: true, + }, + { + name: "empty string", + input: "0x", + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := server.decodeHexTx(tt.input) + if tt.expectError { + assert.Assert(t, err != nil) + } else { + assert.NilError(t, err) + assert.Assert(t, result != nil) + } + }) + } +} + +func TestForceInclusionServer_ProxyToExecutionRPC(t *testing.T) { + mockDAClient := &mockDA{} + + // Create a mock execution RPC server + mockExecutionRPC := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req JSONRPCRequest + err := json.NewDecoder(r.Body).Decode(&req) + assert.NilError(t, err) + + // Return a mock response + resp := JSONRPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Result: "0x1234567890abcdef", + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(resp) + assert.NilError(t, err) + })) + defer mockExecutionRPC.Close() + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, mockExecutionRPC.URL) + assert.NilError(t, err) + + // Test proxying eth_getBalance + reqBody := JSONRPCRequest{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_getBalance", + Params: json.RawMessage(`["0x1234567890123456789012345678901234567890", "latest"]`), + } + + reqJSON, err := json.Marshal(reqBody) + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqJSON)) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Equal(t, "2.0", jsonResp.JSONRPC) + assert.Assert(t, jsonResp.Error == nil) + assert.Equal(t, "0x1234567890abcdef", jsonResp.Result) +} + +func TestForceInclusionServer_ProxyWithoutExecutionRPC(t *testing.T) { + mockDAClient := &mockDA{} + + cfg := config.Config{ + DA: config.DAConfig{ + ForcedInclusionNamespace: "0x0000000000000000000000000000000000000000000000000000666f72636564", + }, + } + + gen := genesis.Genesis{} + logger := zerolog.New(zerolog.NewTestWriter(t)) + // Create server without execution RPC URL + server, err := NewForceInclusionServer("", mockDAClient, cfg, gen, logger, "") + assert.NilError(t, err) + + // Test that unknown methods return method not found when no execution RPC is configured + reqBody := JSONRPCRequest{ + JSONRPC: "2.0", + ID: 1, + Method: "eth_getBalance", + Params: json.RawMessage(`[]`), + } + + reqJSON, err := json.Marshal(reqBody) + assert.NilError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(reqJSON)) + w := httptest.NewRecorder() + + server.handleJSONRPC(w, req) + + resp := w.Result() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + var jsonResp JSONRPCResponse + err = json.NewDecoder(resp.Body).Decode(&jsonResp) + assert.NilError(t, err) + assert.Assert(t, jsonResp.Error != nil) + assert.Equal(t, MethodNotFound, jsonResp.Error.Code) +} diff --git a/apps/testapp/kv/kvexecutor.go b/apps/testapp/kv/kvexecutor.go index ad62e723fa..49d6c21e5e 100644 --- a/apps/testapp/kv/kvexecutor.go +++ b/apps/testapp/kv/kvexecutor.go @@ -221,7 +221,7 @@ func (k *KVExecutor) GetTxs(ctx context.Context) ([][]byte, error) { // ExecuteTxs processes each transaction assumed to be in the format "key=value". // It updates the database accordingly using a batch and removes the executed transactions from the mempool. -// If a transaction is malformed, an error is returned, and the database is not changed. +// Invalid transactions are filtered out and logged, but execution continues. func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, uint64, error) { select { case <-ctx.Done(): @@ -234,23 +234,41 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u return nil, 0, fmt.Errorf("failed to create database batch: %w", err) } + validTxCount := 0 + invalidTxCount := 0 + // Process transactions and stage them in the batch - for _, tx := range txs { + // Filter out invalid/gibberish transactions gracefully + for i, tx := range txs { + // Skip empty transactions + if len(tx) == 0 { + fmt.Printf("Warning: skipping empty transaction at index %d in block %d\n", i, blockHeight) + invalidTxCount++ + continue + } + parts := strings.SplitN(string(tx), "=", 2) if len(parts) != 2 { - return nil, 0, errors.New("malformed transaction; expected format key=value") + fmt.Printf("Warning: filtering out malformed transaction at index %d in block %d (expected format key=value): %s\n", i, blockHeight, string(tx)) + invalidTxCount++ + continue } + key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) if key == "" { - return nil, 0, errors.New("empty key in transaction") + fmt.Printf("Warning: filtering out transaction with empty key at index %d in block %d\n", i, blockHeight) + invalidTxCount++ + continue } dsKey := getTxKey(blockHeight, key) // Prevent writing reserved keys via transactions if reservedKeys[dsKey] { - return nil, 0, fmt.Errorf("transaction attempts to modify reserved key: %s", key) + fmt.Printf("Warning: filtering out transaction attempting to modify reserved key at index %d in block %d: %s\n", i, blockHeight, key) + invalidTxCount++ + continue } err = batch.Put(ctx, dsKey, []byte(value)) @@ -258,6 +276,12 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u // This error is unlikely for Put unless the context is cancelled. return nil, 0, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err) } + validTxCount++ + } + + // Log filtering results if any transactions were filtered + if invalidTxCount > 0 { + fmt.Printf("Block %d: processed %d valid transactions, filtered out %d invalid transactions\n", blockHeight, validTxCount, invalidTxCount) } // Commit the batch to apply all changes atomically diff --git a/apps/testapp/kv/kvexecutor_test.go b/apps/testapp/kv/kvexecutor_test.go index a1c563f5f1..c10eff65a5 100644 --- a/apps/testapp/kv/kvexecutor_test.go +++ b/apps/testapp/kv/kvexecutor_test.go @@ -142,14 +142,46 @@ func TestExecuteTxs_Invalid(t *testing.T) { } ctx := context.Background() - // Prepare an invalid transaction (missing '=') + // According to the Executor interface: "Must handle gracefully gibberish transactions" + // Invalid transactions should be filtered out, not cause errors + + // Prepare invalid transactions (missing '=') txs := [][]byte{ []byte("invalidformat"), + []byte("another_invalid_one"), + []byte(""), } - _, _, err = exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte("")) - if err == nil { - t.Fatal("Expected error for malformed transaction, got nil") + stateRoot, maxBytes, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte("")) + if err != nil { + t.Fatalf("ExecuteTxs should handle gibberish gracefully, got error: %v", err) + } + if maxBytes != 1024 { + t.Errorf("Expected maxBytes 1024, got %d", maxBytes) + } + + // State root should still be computed (empty block is valid) + if stateRoot == nil { + t.Error("Expected non-nil state root even with all invalid transactions") + } + + // Test mix of valid and invalid transactions + mixedTxs := [][]byte{ + []byte("valid_key=valid_value"), + []byte("invalidformat"), + []byte("another_valid=value2"), + []byte(""), + } + + stateRoot2, _, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), stateRoot) + if err != nil { + t.Fatalf("ExecuteTxs should filter invalid transactions and process valid ones, got error: %v", err) + } + + // State root should contain only the valid transactions + rootStr := string(stateRoot2) + if !strings.Contains(rootStr, "valid_key:valid_value") || !strings.Contains(rootStr, "another_valid:value2") { + t.Errorf("State root should contain valid transactions: %s", rootStr) } } diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index e8292085fc..263d590815 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -338,8 +338,9 @@ func (e *Executor) produceBlock() error { } var ( - header *types.SignedHeader - data *types.Data + header *types.SignedHeader + data *types.Data + batchData *BatchData ) // Check if there's an already stored block at the newHeight @@ -353,7 +354,7 @@ func (e *Executor) produceBlock() error { return fmt.Errorf("failed to get block data: %w", err) } else { // get batch from sequencer - batchData, err := e.retrieveBatch(e.ctx) + batchData, err = e.retrieveBatch(e.ctx) if errors.Is(err, common.ErrNoBatch) { e.logger.Debug().Msg("no batch available") return nil @@ -381,7 +382,15 @@ func (e *Executor) produceBlock() error { } } - newState, err := e.applyBlock(e.ctx, header.Header, data) + // Pass force-included mask through context for execution optimization + // Force-included txs (from DA) MUST be validated as they're from untrusted sources + // Mempool txs can skip validation as they were validated when added to mempool + ctx := e.ctx + if batchData != nil && batchData.Batch != nil && batchData.ForceIncludedMask != nil { + ctx = coreexecutor.WithForceIncludedMask(ctx, batchData.ForceIncludedMask) + } + + newState, err := e.applyBlock(ctx, header.Header, data) if err != nil { return fmt.Errorf("failed to apply block: %w", err) } diff --git a/core/execution/context.go b/core/execution/context.go new file mode 100644 index 0000000000..c7757da1b8 --- /dev/null +++ b/core/execution/context.go @@ -0,0 +1,20 @@ +package execution + +import "context" + +type forceInclusionMaskContextKey struct{} + +// WithForceIncludedMask adds the force-included mask to the context +// The mask indicates which transactions are force-included from DA (true) vs mempool (false) +func WithForceIncludedMask(ctx context.Context, mask []bool) context.Context { + return context.WithValue(ctx, forceInclusionMaskContextKey{}, mask) +} + +// GetForceIncludedMask retrieves the force-included mask from the context +// Returns nil if no mask is present in the context +func GetForceIncludedMask(ctx context.Context) []bool { + if mask, ok := ctx.Value(forceInclusionMaskContextKey{}).([]bool); ok { + return mask + } + return nil +} diff --git a/core/execution/context_test.go b/core/execution/context_test.go new file mode 100644 index 0000000000..d13e1a64be --- /dev/null +++ b/core/execution/context_test.go @@ -0,0 +1,90 @@ +package execution_test + +import ( + "context" + "slices" + "testing" + + "github.com/evstack/ev-node/core/execution" +) + +// TestWithForceIncludedMask_ContextRoundtrip verifies that the mask +// can be stored in and retrieved from context correctly +func TestWithForceIncludedMask_ContextRoundtrip(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + mask []bool + }{ + { + name: "nil mask", + mask: nil, + }, + { + name: "empty mask", + mask: []bool{}, + }, + { + name: "single element", + mask: []bool{true}, + }, + { + name: "multiple elements", + mask: []bool{true, false, true, false, false}, + }, + { + name: "all true", + mask: []bool{true, true, true}, + }, + { + name: "all false", + mask: []bool{false, false, false}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + // Add mask to context + ctxWithMask := execution.WithForceIncludedMask(ctx, tt.mask) + + // Retrieve mask from context + retrieved := execution.GetForceIncludedMask(ctxWithMask) + + // Verify it matches + if !slices.Equal(tt.mask, retrieved) { + t.Errorf("expected mask %v, got %v", tt.mask, retrieved) + } + }) + } +} + +// TestGetForceIncludedMask_NoMask verifies that getting a mask from +// a context without one returns nil +func TestGetForceIncludedMask_NoMask(t *testing.T) { + t.Parallel() + + ctx := context.Background() + mask := execution.GetForceIncludedMask(ctx) + if mask != nil { + t.Errorf("expected nil mask from context without mask, got %v", mask) + } +} + +// TestGetForceIncludedMask_WrongType verifies that wrong types in context +// are handled gracefully +func TestGetForceIncludedMask_WrongType(t *testing.T) { + t.Parallel() + + // Create context with wrong type + ctx := context.WithValue(context.Background(), "force_included_mask", "wrong type") + + mask := execution.GetForceIncludedMask(ctx) + if mask != nil { + t.Errorf("expected nil mask when context has wrong type, got %v", mask) + } +} diff --git a/core/sequencer/sequencing.go b/core/sequencer/sequencing.go index e97ef93dd3..4aa2892263 100644 --- a/core/sequencer/sequencing.go +++ b/core/sequencer/sequencing.go @@ -39,6 +39,12 @@ type Sequencer interface { // Batch is a collection of transactions type Batch struct { Transactions [][]byte + // ForceIncludedMask indicates which transactions are force-included from DA + // If nil, all transactions should be validated (backward compatibility) + // If set, ForceIncludedMask[i] == true means Transactions[i] is force-included from DA + // and MUST be validated (untrusted source). ForceIncludedMask[i] == false means the + // transaction is from mempool and can skip validation (already validated on submission) + ForceIncludedMask []bool } // Hash returns the cryptographic hash of the batch diff --git a/execution/evm/execution.go b/execution/evm/execution.go index e6532bd627..33b466c054 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -251,12 +251,46 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { // ExecuteTxs executes the given transactions at the specified block height and timestamp func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) { - // convert evolve tx to hex strings for ev-reth - txsPayload := make([]string, len(txs)) + forceIncludedMask := execution.GetForceIncludedMask(ctx) + + // Filter out invalid transactions to handle gibberish gracefully + validTxs := make([]string, 0, len(txs)) for i, tx := range txs { - // Use the raw transaction bytes directly instead of re-encoding - txsPayload[i] = "0x" + hex.EncodeToString(tx) + if len(tx) == 0 { + continue + } + + // Skip validation for mempool transactions (already validated when added to mempool) + // Force-included transactions from DA MUST be validated as they come from untrusted sources + if forceIncludedMask != nil && i < len(forceIncludedMask) && !forceIncludedMask[i] { + validTxs = append(validTxs, "0x"+hex.EncodeToString(tx)) + continue + } + + // Validate that the transaction can be parsed as an Ethereum transaction + var ethTx types.Transaction + if err := ethTx.UnmarshalBinary(tx); err != nil { + c.logger.Debug(). + Int("tx_index", i). + Uint64("block_height", blockHeight). + Err(err). + Str("tx_hex", "0x"+hex.EncodeToString(tx)). + Msg("filtering out invalid transaction (gibberish)") + continue + } + + validTxs = append(validTxs, "0x"+hex.EncodeToString(tx)) + } + + if len(validTxs) < len(txs) { + c.logger.Debug(). + Int("total_txs", len(txs)). + Int("valid_txs", len(validTxs)). + Int("filtered_txs", len(txs)-len(validTxs)). + Uint64("block_height", blockHeight). + Msg("filtered out invalid transactions") } + txsPayload := validTxs prevBlockHash, _, prevGasLimit, _, err := c.getBlockInfo(ctx, blockHeight-1) if err != nil { diff --git a/execution/evm/force_included_test.go b/execution/evm/force_included_test.go new file mode 100644 index 0000000000..9c4eb3f8d0 --- /dev/null +++ b/execution/evm/force_included_test.go @@ -0,0 +1,162 @@ +package evm + +import ( + "context" + "encoding/hex" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + coreexecution "github.com/evstack/ev-node/core/execution" +) + +// TestExecuteTxs_ForceIncludedMask verifies that force-included transactions +// are validated while mempool transactions skip validation (already validated on submission) +func TestExecuteTxs_ForceIncludedMask(t *testing.T) { + t.Parallel() + + // Create a valid Ethereum transaction for testing + validTx := types.NewTransaction( + 0, // nonce + common.HexToAddress("0x1234567890123456789012345678901234567890"), // to + common.Big0, // value + 21000, // gas + common.Big1, // gasPrice + nil, // data + ) + validTxBytes, err := validTx.MarshalBinary() + require.NoError(t, err) + + // Create invalid transaction bytes (gibberish) + invalidTxBytes := []byte("this is not a valid transaction") + + tests := []struct { + name string + txs [][]byte + mask []bool + expectedValidTxs int + expectedFilterMsg string + }{ + { + name: "all transactions validated when no mask", + txs: [][]byte{ + validTxBytes, + validTxBytes, + invalidTxBytes, // Should be filtered + validTxBytes, + }, + mask: nil, // No mask = validate all + expectedValidTxs: 3, // 3 valid, 1 invalid filtered + }, + { + name: "force-included invalid tx must be validated (filtered out)", + txs: [][]byte{ + invalidTxBytes, // Force-included, MUST be validated - will be filtered + validTxBytes, // Mempool tx, skips validation + }, + mask: []bool{true, false}, + expectedValidTxs: 1, // Only mempool tx passes (force-included filtered) + }, + { + name: "mixed force-included and mempool transactions", + txs: [][]byte{ + validTxBytes, // Force-included, validated (valid) + validTxBytes, // Force-included, validated (valid) + invalidTxBytes, // Mempool tx, skips validation (passes through) + validTxBytes, // Mempool tx, skips validation (passes through) + }, + mask: []bool{true, true, false, false}, + expectedValidTxs: 4, // 2 valid force-included + 2 mempool txs + }, + { + name: "all force-included transactions must be validated", + txs: [][]byte{ + invalidTxBytes, // Force-included gibberish - filtered + invalidTxBytes, // Force-included gibberish - filtered + invalidTxBytes, // Force-included gibberish - filtered + }, + mask: []bool{true, true, true}, + expectedValidTxs: 0, // All filtered out (invalid) + }, + { + name: "all mempool transactions skip validation", + txs: [][]byte{ + validTxBytes, + validTxBytes, + invalidTxBytes, // Skips validation, passes through + }, + mask: []bool{false, false, false}, + expectedValidTxs: 3, // All pass (validation skipped) + }, + { + name: "empty mask same as no mask", + txs: [][]byte{ + validTxBytes, + invalidTxBytes, // Should be filtered + }, + mask: []bool{}, + expectedValidTxs: 1, // 1 valid, 1 filtered + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + // Create context with or without mask + ctx := context.Background() + if tt.mask != nil { + ctx = coreexecution.WithForceIncludedMask(ctx, tt.mask) + } + + // Call the validation logic by inspecting how transactions are filtered + // We'll extract the logic to count valid transactions + forceIncludedMask := coreexecution.GetForceIncludedMask(ctx) + validTxs := make([]string, 0, len(tt.txs)) + skippedValidation := 0 + + for i, tx := range tt.txs { + if len(tx) == 0 { + continue + } + + // Skip validation for mempool transactions (already validated when added to mempool) + // Force-included transactions from DA MUST be validated + if forceIncludedMask != nil && i < len(forceIncludedMask) && !forceIncludedMask[i] { + validTxs = append(validTxs, "0x"+hex.EncodeToString(tx)) + skippedValidation++ + continue + } + + // Validate force-included transactions (and all txs when no mask) + var ethTx types.Transaction + if err := ethTx.UnmarshalBinary(tx); err != nil { + // Invalid transaction, skip it + continue + } + + validTxs = append(validTxs, "0x"+hex.EncodeToString(tx)) + } + + // Verify expected number of valid transactions + assert.Equal(t, tt.expectedValidTxs, len(validTxs), + "unexpected number of valid transactions") + + // Verify mempool transactions were actually skipped + if tt.mask != nil { + expectedSkipped := 0 + for i, isForceIncluded := range tt.mask { + // Skip when NOT force-included (i.e., mempool tx) + if !isForceIncluded && i < len(tt.txs) && len(tt.txs[i]) > 0 { + expectedSkipped++ + } + } + assert.Equal(t, expectedSkipped, skippedValidation, + "unexpected number of skipped validations") + } + }) + } +} diff --git a/execution/evm/go.mod b/execution/evm/go.mod index 9451e35db7..44de474d09 100644 --- a/execution/evm/go.mod +++ b/execution/evm/go.mod @@ -59,3 +59,5 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/evstack/ev-node/core => ../../core diff --git a/execution/evm/go.sum b/execution/evm/go.sum index 73bf623b00..6c4c34a7a3 100644 --- a/execution/evm/go.sum +++ b/execution/evm/go.sum @@ -60,8 +60,6 @@ github.com/ethereum/go-ethereum v1.16.7 h1:qeM4TvbrWK0UC0tgkZ7NiRsmBGwsjqc64BHo2 github.com/ethereum/go-ethereum v1.16.7/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node/core v1.0.0-beta.5 h1:lgxE8XiF3U9pcFgh7xuKMgsOGvLBGRyd9kc9MR4WL0o= -github.com/evstack/ev-node/core v1.0.0-beta.5/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY= github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= diff --git a/execution/evm/test/go.mod b/execution/evm/test/go.mod index d8e1e56113..dd316bf51e 100644 --- a/execution/evm/test/go.mod +++ b/execution/evm/test/go.mod @@ -188,4 +188,7 @@ require ( sigs.k8s.io/yaml v1.4.0 // indirect ) -replace github.com/evstack/ev-node/execution/evm => ../ +replace ( + github.com/evstack/ev-node/core => ../../../core + github.com/evstack/ev-node/execution/evm => ../ +) diff --git a/execution/evm/test/go.sum b/execution/evm/test/go.sum index 6834d98665..4abad38218 100644 --- a/execution/evm/test/go.sum +++ b/execution/evm/test/go.sum @@ -190,8 +190,6 @@ github.com/ethereum/go-ethereum v1.16.7 h1:qeM4TvbrWK0UC0tgkZ7NiRsmBGwsjqc64BHo2 github.com/ethereum/go-ethereum v1.16.7/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node/core v1.0.0-beta.5 h1:lgxE8XiF3U9pcFgh7xuKMgsOGvLBGRyd9kc9MR4WL0o= -github.com/evstack/ev-node/core v1.0.0-beta.5/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= diff --git a/sequencers/based/sequencer.go b/sequencers/based/sequencer.go index e209365db4..7beab43b57 100644 --- a/sequencers/based/sequencer.go +++ b/sequencers/based/sequencer.go @@ -156,7 +156,16 @@ func (s *BasedSequencer) createBatchFromQueue(maxBytes uint64) *coresequencer.Ba } } - return &coresequencer.Batch{Transactions: batch} + // Mark all transactions as force-included since based sequencer only pulls from DA + forceIncludedMask := make([]bool, len(batch)) + for i := range forceIncludedMask { + forceIncludedMask[i] = true + } + + return &coresequencer.Batch{ + Transactions: batch, + ForceIncludedMask: forceIncludedMask, + } } // VerifyBatch verifies a batch of transactions diff --git a/sequencers/single/sequencer.go b/sequencers/single/sequencer.go index f887d32d69..ea60cf003c 100644 --- a/sequencers/single/sequencer.go +++ b/sequencers/single/sequencer.go @@ -203,6 +203,13 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB batch.Transactions = append(forcedTxs, trimmedBatchTxs...) + // Create ForceIncludedMask: true for forced txs, false for mempool txs. + // Forced included txs are always first in the batch. + batch.ForceIncludedMask = make([]bool, len(batch.Transactions)) + for i := range len(forcedTxs) { + batch.ForceIncludedMask[i] = true + } + c.logger.Debug(). Int("forced_tx_count", len(forcedTxs)). Int("forced_txs_size", forcedTxsSize). @@ -211,6 +218,9 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB Int("total_tx_count", len(batch.Transactions)). Int("total_size", forcedTxsSize+currentBatchSize). Msg("combined forced inclusion and batch transactions") + } else if len(batch.Transactions) > 0 { + // No forced txs, but we have mempool txs - mark all as non-force-included + batch.ForceIncludedMask = make([]bool, len(batch.Transactions)) } return &coresequencer.GetNextBatchResponse{ diff --git a/test/e2e/go.mod b/test/e2e/go.mod index 85418bfbe8..7e6403deeb 100644 --- a/test/e2e/go.mod +++ b/test/e2e/go.mod @@ -16,6 +16,7 @@ require ( replace ( github.com/evstack/ev-node => ../../ + github.com/evstack/ev-node/core => ../../core github.com/evstack/ev-node/execution/evm => ../../execution/evm github.com/evstack/ev-node/execution/evm/test => ../../execution/evm/test ) diff --git a/test/e2e/go.sum b/test/e2e/go.sum index f4429f538c..cf58c83497 100644 --- a/test/e2e/go.sum +++ b/test/e2e/go.sum @@ -198,8 +198,6 @@ github.com/ethereum/go-ethereum v1.16.7 h1:qeM4TvbrWK0UC0tgkZ7NiRsmBGwsjqc64BHo2 github.com/ethereum/go-ethereum v1.16.7/go.mod h1:Fs6QebQbavneQTYcA39PEKv2+zIjX7rPUZ14DER46wk= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= -github.com/evstack/ev-node/core v1.0.0-beta.5 h1:lgxE8XiF3U9pcFgh7xuKMgsOGvLBGRyd9kc9MR4WL0o= -github.com/evstack/ev-node/core v1.0.0-beta.5/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=