diff --git a/Makefile b/Makefile index daae8123920..6104b235f5b 100644 --- a/Makefile +++ b/Makefile @@ -126,6 +126,10 @@ op-supernode: ## Builds op-supernode binary just $(JUSTFLAGS) ./op-supernode/op-supernode .PHONY: op-supernode +op-interop-filter: ## Builds op-interop-filter binary + just $(JUSTFLAGS) ./op-interop-filter/op-interop-filter +.PHONY: op-interop-filter + op-program: ## Builds op-program binary make -C ./op-program op-program .PHONY: op-program diff --git a/docker-bake.hcl b/docker-bake.hcl index d19ab640f31..58b784ae63e 100644 --- a/docker-bake.hcl +++ b/docker-bake.hcl @@ -73,6 +73,10 @@ variable "OP_SUPERNODE_VERSION" { default = "${GIT_VERSION}" } +variable "OP_INTEROP_FILTER_VERSION" { + default = "${GIT_VERSION}" +} + variable "OP_TEST_SEQUENCER_VERSION" { default = "${GIT_VERSION}" } @@ -233,6 +237,19 @@ target "op-supernode" { tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-supernode:${tag}"] } +target "op-interop-filter" { + dockerfile = "ops/docker/op-stack-go/Dockerfile" + context = "." + args = { + GIT_COMMIT = "${GIT_COMMIT}" + GIT_DATE = "${GIT_DATE}" + OP_INTEROP_FILTER_VERSION = "${OP_INTEROP_FILTER_VERSION}" + } + target = "op-interop-filter-target" + platforms = split(",", PLATFORMS) + tags = [for tag in split(",", IMAGE_TAGS) : "${REGISTRY}/${REPOSITORY}/op-interop-filter:${tag}"] +} + target "op-test-sequencer" { dockerfile = "ops/docker/op-stack-go/Dockerfile" context = "." diff --git a/op-interop-filter/.gitignore b/op-interop-filter/.gitignore new file mode 100644 index 00000000000..cf2e9ae1a78 --- /dev/null +++ b/op-interop-filter/.gitignore @@ -0,0 +1,2 @@ +# Dev environment with API keys +.env.dev diff --git a/op-interop-filter/Makefile b/op-interop-filter/Makefile new file mode 100644 index 00000000000..9d1abda16ca --- /dev/null +++ b/op-interop-filter/Makefile @@ -0,0 +1,3 @@ +DEPRECATED_TARGETS := op-interop-filter clean test + +include ../justfiles/deprecated.mk diff --git a/op-interop-filter/README.md b/op-interop-filter/README.md new file mode 100644 index 00000000000..3b3f32eee8b --- /dev/null +++ b/op-interop-filter/README.md @@ -0,0 +1,26 @@ +# op-interop-filter + +A lightweight service that validates interop executing messages for op-geth or op-reth transaction filtering. + +Any reorg will trigger the failsafe which disables all interop transactions. + +## Usage + +### Build from source + +```bash +just op-interop-filter +./bin/op-interop-filter --help +``` + +### Run from source + +```bash +go run ./cmd --help +``` + +### Build docker image + +```bash +docker buildx bake op-interop-filter +``` diff --git a/op-interop-filter/cmd/dashboard/main.go b/op-interop-filter/cmd/dashboard/main.go new file mode 100644 index 00000000000..d2ae7987215 --- /dev/null +++ b/op-interop-filter/cmd/dashboard/main.go @@ -0,0 +1,560 @@ +package main + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "sort" + "strconv" + "strings" + "syscall" + "time" + + "github.com/urfave/cli/v2" +) + +// chainNames maps chain IDs to human-readable names +var chainNames = map[string]string{ + "11155420": "OP Sepolia", + "1301": "Unichain Sepolia", + "10": "OP Mainnet", + "1": "Ethereum", + "8453": "Base", + "84532": "Base Sepolia", +} + +// chainExplorers maps chain IDs to Blockscout explorer base URLs +var chainExplorers = map[string]string{ + "11155420": "https://optimism-sepolia.blockscout.com", + "1301": "https://unichain-sepolia.blockscout.com", + "10": "https://optimism.blockscout.com", + "8453": "https://base.blockscout.com", + "84532": "https://base-sepolia.blockscout.com", +} + +// RecentQuery matches the spammer's RecentQuery struct +// Note: ChainID is string because eth.ChainID serializes to decimal string via MarshalText +type RecentQuery struct { + Timestamp string `json:"timestamp"` + ChainID string `json:"chain_id"` + BlockNumber uint64 `json:"block_number"` + TxHash string `json:"tx_hash"` + LogIndex uint `json:"log_index"` + Address string `json:"address"` + QueryType string `json:"query_type"` + Result string `json:"result"` +} + +var ( + FilterMetricsFlag = &cli.StringFlag{ + Name: "filter-metrics", + Usage: "Filter service metrics endpoint", + Value: "http://localhost:7300/metrics", + EnvVars: []string{"FILTER_METRICS"}, + } + SpammerMetricsFlag = &cli.StringFlag{ + Name: "spammer-metrics", + Usage: "Spammer metrics endpoints (comma-separated for multiple chains)", + Value: "http://localhost:7301/metrics", + EnvVars: []string{"SPAMMER_METRICS"}, + } + RefreshIntervalFlag = &cli.StringFlag{ + Name: "refresh", + Usage: "Refresh interval (e.g., 1s, 2s)", + Value: "2s", + EnvVars: []string{"REFRESH_INTERVAL"}, + } +) + +type Metrics struct { + // Filter metrics + FilterUp float64 + FilterFailsafe float64 + FilterChainReady map[string]float64 + FilterChainHead map[string]float64 + FilterBackfillProg map[string]float64 + FilterCheckSuccess float64 + FilterCheckFailed float64 + FilterReorgs map[string]float64 + + // LogsDB metrics + LogsDBFirstBlock map[string]float64 + LogsDBBlocksSealed map[string]float64 + LogsDBLogsAdded map[string]float64 + LogsDBEntries map[string]float64 + + // Spammer metrics + SpammerUp float64 + SpammerValidAccepted float64 + SpammerValidRejected float64 + SpammerInvalidAccepted float64 + SpammerInvalidRejected float64 + SpammerErrors float64 + SpammerLatencyValid float64 + SpammerLatencyInvalid float64 +} + +func main() { + app := cli.NewApp() + app.Name = "filter-dashboard" + app.Usage = "Terminal dashboard for op-interop-filter observability" + app.Flags = []cli.Flag{ + FilterMetricsFlag, + SpammerMetricsFlag, + RefreshIntervalFlag, + } + app.Action = run + + if err := app.Run(os.Args); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +func run(cliCtx *cli.Context) error { + filterURL := cliCtx.String(FilterMetricsFlag.Name) + spammerURLsStr := cliCtx.String(SpammerMetricsFlag.Name) + refreshStr := cliCtx.String(RefreshIntervalFlag.Name) + + // Parse comma-separated spammer URLs + spammerURLs := strings.Split(spammerURLsStr, ",") + for i := range spammerURLs { + spammerURLs[i] = strings.TrimSpace(spammerURLs[i]) + } + + refresh, err := time.ParseDuration(refreshStr) + if err != nil { + return fmt.Errorf("invalid refresh interval: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle interrupt + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + cancel() + }() + + startTime := time.Now() + ticker := time.NewTicker(refresh) + defer ticker.Stop() + + // Initial render + render(filterURL, spammerURLs, startTime) + + for { + select { + case <-ctx.Done(): + clearScreen() + fmt.Println("Dashboard stopped.") + return nil + case <-ticker.C: + render(filterURL, spammerURLs, startTime) + } + } +} + +func render(filterURL string, spammerURLs []string, startTime time.Time) { + clearScreen() + + m := fetchAllMetrics(filterURL, spammerURLs) + uptime := time.Since(startTime).Round(time.Second) + + // Header + fmt.Println("╔══════════════════════════════════════════════════════════════════════════════╗") + printLine(" OP-INTEROP-FILTER DASHBOARD") + printLine(fmt.Sprintf(" Uptime: %-20s %s", uptime, time.Now().Format("15:04:05"))) + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + + // Filter Service Status + filterStatus := "[DOWN]" + if m.FilterUp > 0 { + filterStatus = "[UP]" + } + failsafeStatus := "[OK]" + if m.FilterFailsafe > 0 { + failsafeStatus = "[TRIGGERED]" + } + + printLine(" FILTER SERVICE") + printLine(fmt.Sprintf(" Status: %-10s Failsafe: %-12s", filterStatus, failsafeStatus)) + + // Chain status (sorted for consistent display) + if len(m.FilterChainHead) > 0 { + chainIDs := make([]string, 0, len(m.FilterChainHead)) + for chainID := range m.FilterChainHead { + chainIDs = append(chainIDs, chainID) + } + sort.Strings(chainIDs) + + for _, chainID := range chainIDs { + head := m.FilterChainHead[chainID] + ready := "..." + if r, ok := m.FilterChainReady[chainID]; ok && r > 0 { + ready = "[ready]" + } + backfill := 0.0 + if bf, ok := m.FilterBackfillProg[chainID]; ok { + backfill = bf * 100 + } + reorgs := 0.0 + if r, ok := m.FilterReorgs[chainID]; ok { + reorgs = r + } + name := chainName(chainID) + printLine(fmt.Sprintf(" %-18s %-7s Head=%-10.0f Backfill=%5.1f%% Reorgs=%.0f", + name, ready, head, backfill, reorgs)) + } + } else { + printLine(" No chain data available") + } + + // Total requests + totalChecks := m.FilterCheckSuccess + m.FilterCheckFailed + printLine(fmt.Sprintf(" Requests: %.0f total", totalChecks)) + + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + + // LogsDB Stats (sorted for consistent display) + printLine(" LOGSDB") + if len(m.LogsDBBlocksSealed) > 0 { + chainIDs := make([]string, 0, len(m.LogsDBBlocksSealed)) + for chainID := range m.LogsDBBlocksSealed { + chainIDs = append(chainIDs, chainID) + } + sort.Strings(chainIDs) + + for _, chainID := range chainIDs { + firstBlock := m.LogsDBFirstBlock[chainID] + blocksSealed := m.LogsDBBlocksSealed[chainID] + logsAdded := m.LogsDBLogsAdded[chainID] + entries := m.LogsDBEntries[chainID] + name := chainName(chainID) + printLine(fmt.Sprintf(" %-18s Blocks=%-8.0f Logs=%-8.0f Entries=%-8.0f", + name, blocksSealed, logsAdded, entries)) + printLine(fmt.Sprintf(" First Block=%-10.0f", firstBlock)) + } + } else { + printLine(" No LogsDB data available") + } + + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + + // Spammer Status + spammerStatus := "[DOWN]" + if m.SpammerUp > 0 { + spammerStatus = "[UP]" + } + + printLine(" SPAMMER") + printLine(fmt.Sprintf(" Status: %-10s Errors: %.0f", spammerStatus, m.SpammerErrors)) + + // Query stats + validTotal := m.SpammerValidAccepted + m.SpammerValidRejected + invalidTotal := m.SpammerInvalidAccepted + m.SpammerInvalidRejected + allQueries := validTotal + invalidTotal + + printLine(fmt.Sprintf(" Valid queries: %-6.0f (accepted: %.0f, rejected: %.0f)", + validTotal, m.SpammerValidAccepted, m.SpammerValidRejected)) + printLine(fmt.Sprintf(" Invalid queries: %-6.0f (accepted: %.0f, rejected: %.0f)", + invalidTotal, m.SpammerInvalidAccepted, m.SpammerInvalidRejected)) + + // Calculate correctness + correct := m.SpammerValidAccepted + m.SpammerInvalidRejected + correctRate := 0.0 + if allQueries > 0 { + correctRate = (correct / allQueries) * 100 + } + + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + printLine(" SUMMARY") + printLine(fmt.Sprintf(" Total Queries: %-8.0f Correctness: %6.2f%%", allQueries, correctRate)) + + // Progress bar for correctness + barWidth := 40 + filled := int(correctRate / 100 * float64(barWidth)) + bar := strings.Repeat("=", filled) + strings.Repeat("-", barWidth-filled) + printLine(fmt.Sprintf(" [%s]", bar)) + + fmt.Println("╠══════════════════════════════════════════════════════════════════════════════╣") + + // Recent queries - grouped by chain + printLine(" RECENT QUERIES (newest first)") + + // Fetch queries from all spammer endpoints + recentQueryList := fetchRecentQueriesFromAll(spammerURLs) + + // Group by chain ID (string since eth.ChainID serializes to decimal string) + queriesByChain := make(map[string][]RecentQuery) + for _, q := range recentQueryList { + queriesByChain[q.ChainID] = append(queriesByChain[q.ChainID], q) + } + + if len(queriesByChain) > 0 { + // Sort chain IDs for consistent display + chainIDs := make([]string, 0, len(queriesByChain)) + for chainID := range queriesByChain { + chainIDs = append(chainIDs, chainID) + } + sort.Strings(chainIDs) + + for _, chainID := range chainIDs { + queries := queriesByChain[chainID] + name := chainName(chainID) + printLine(fmt.Sprintf(" --- %s ---", name)) + + // Show up to 3 per chain + limit := 3 + if len(queries) < limit { + limit = len(queries) + } + for i := 0; i < limit; i++ { + q := queries[i] + icon := "OK" + if (q.QueryType == "valid" && q.Result == "rejected") || + (q.QueryType == "invalid" && q.Result == "accepted") { + icon = "!!" + } + // Shorten tx hash for display + txShort := q.TxHash + if len(txShort) > 18 { + txShort = txShort[:10] + "..." + txShort[len(txShort)-6:] + } + printLine(fmt.Sprintf(" [%s] %s blk=%-8d log=%-3d tx=%s", + icon, q.Timestamp, q.BlockNumber, q.LogIndex, txShort)) + // Show address and result on second line + addrShort := q.Address + if len(addrShort) > 22 { + addrShort = addrShort[:12] + "..." + addrShort[len(addrShort)-8:] + } + printLine(fmt.Sprintf(" addr=%s type=%-7s result=%-8s", + addrShort, q.QueryType, q.Result)) + // Show explorer link on third line + if explorer, ok := chainExplorers[q.ChainID]; ok { + printLine(fmt.Sprintf(" %s/tx/%s", explorer, q.TxHash)) + } + } + } + } else { + printLine(" No recent queries") + } + + fmt.Println("╚══════════════════════════════════════════════════════════════════════════════╝") + fmt.Println("Press Ctrl+C to exit") +} + +func clearScreen() { + fmt.Print("\033[H\033[2J") +} + +// boxWidth is the inner width of the dashboard box (between the ║ characters) +const boxWidth = 76 + +// printLine prints a line with proper padding to align the right border +// It accounts for emoji display width (emojis display as 2 chars but are 1 rune) +func printLine(content string) { + // Count display width (emojis count as 2) + displayWidth := 0 + for _, r := range content { + if r > 0x1F600 || (r >= 0x2600 && r <= 0x27BF) || (r >= 0x1F300 && r <= 0x1F9FF) { + displayWidth += 2 // emoji takes 2 display columns + } else { + displayWidth += 1 + } + } + + padding := boxWidth - displayWidth + if padding < 0 { + padding = 0 + } + fmt.Printf("║%s%s║\n", content, strings.Repeat(" ", padding)) +} + +func fetchAllMetrics(filterURL string, spammerURLs []string) Metrics { + m := Metrics{ + FilterChainReady: make(map[string]float64), + FilterChainHead: make(map[string]float64), + FilterBackfillProg: make(map[string]float64), + FilterReorgs: make(map[string]float64), + LogsDBFirstBlock: make(map[string]float64), + LogsDBBlocksSealed: make(map[string]float64), + LogsDBLogsAdded: make(map[string]float64), + LogsDBEntries: make(map[string]float64), + } + + // Fetch filter metrics + filterMetrics := fetchMetrics(filterURL) + m.FilterUp = filterMetrics["op_interop_filter_up"] + m.FilterFailsafe = filterMetrics["op_interop_filter_failsafe_enabled"] + m.FilterCheckSuccess = filterMetrics["op_interop_filter_check_access_list_total{success=\"true\"}"] + m.FilterCheckFailed = filterMetrics["op_interop_filter_check_access_list_total{success=\"false\"}"] + + // Parse chain-specific metrics + for k, v := range filterMetrics { + if strings.HasPrefix(k, "op_interop_filter_chain_ready{") { + chainID := extractLabel(k, "chain_id") + m.FilterChainReady[chainID] = v + } + if strings.HasPrefix(k, "op_interop_filter_chain_head{") { + chainID := extractLabel(k, "chain_id") + m.FilterChainHead[chainID] = v + } + if strings.HasPrefix(k, "op_interop_filter_backfill_progress{") { + chainID := extractLabel(k, "chain_id") + m.FilterBackfillProg[chainID] = v + } + if strings.HasPrefix(k, "op_interop_filter_reorg_detected_total{") { + chainID := extractLabel(k, "chain_id") + m.FilterReorgs[chainID] = v + } + // LogsDB metrics + if strings.HasPrefix(k, "op_interop_filter_logsdb_first_block{") { + chainID := extractLabel(k, "chain_id") + m.LogsDBFirstBlock[chainID] = v + } + if strings.HasPrefix(k, "op_interop_filter_logsdb_blocks_sealed_total{") { + chainID := extractLabel(k, "chain_id") + m.LogsDBBlocksSealed[chainID] = v + } + if strings.HasPrefix(k, "op_interop_filter_logsdb_logs_added_total{") { + chainID := extractLabel(k, "chain_id") + m.LogsDBLogsAdded[chainID] = v + } + if strings.HasPrefix(k, "op_interop_filter_logsdb_entries{") { + chainID := extractLabel(k, "chain_id") + m.LogsDBEntries[chainID] = v + } + } + + // Fetch spammer metrics from all spammer endpoints and aggregate + for _, spammerURL := range spammerURLs { + spammerMetrics := fetchMetrics(spammerURL) + // Take the max of "up" values (any spammer up = spammer up) + if spammerMetrics["filter_spammer_up"] > m.SpammerUp { + m.SpammerUp = spammerMetrics["filter_spammer_up"] + } + // Sum errors and query counts across all spammers + m.SpammerErrors += spammerMetrics["filter_spammer_errors_total"] + m.SpammerValidAccepted += findMetricWithLabels(spammerMetrics, "filter_spammer_queries_total{", + map[string]string{"type": "valid", "result": "accepted"}) + m.SpammerValidRejected += findMetricWithLabels(spammerMetrics, "filter_spammer_queries_total{", + map[string]string{"type": "valid", "result": "rejected"}) + m.SpammerInvalidAccepted += findMetricWithLabels(spammerMetrics, "filter_spammer_queries_total{", + map[string]string{"type": "invalid", "result": "accepted"}) + m.SpammerInvalidRejected += findMetricWithLabels(spammerMetrics, "filter_spammer_queries_total{", + map[string]string{"type": "invalid", "result": "rejected"}) + } + + return m +} + +func fetchMetrics(url string) map[string]float64 { + result := make(map[string]float64) + + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(url) + if err != nil { + return result + } + defer resp.Body.Close() + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "#") || line == "" { + continue + } + + // Parse prometheus text format: metric_name{labels} value + parts := strings.Fields(line) + if len(parts) < 2 { + continue + } + + metricName := parts[0] + value, err := strconv.ParseFloat(parts[1], 64) + if err != nil { + continue + } + + result[metricName] = value + } + + return result +} + +func extractLabel(metric, labelName string) string { + // Extract label value from metric like: metric_name{chain_id="123"} + start := strings.Index(metric, labelName+"=\"") + if start == -1 { + return "" + } + start += len(labelName) + 2 + end := strings.Index(metric[start:], "\"") + if end == -1 { + return "" + } + return metric[start : start+end] +} + +// chainName returns a human-readable name for a chain ID +func chainName(chainID string) string { + if name, ok := chainNames[chainID]; ok { + return name + } + return "Chain " + chainID +} + +// hasLabels checks if a metric has the specified label values (order-independent) +func hasLabels(metric string, labels map[string]string) bool { + for k, v := range labels { + if extractLabel(metric, k) != v { + return false + } + } + return true +} + +// findMetricWithLabels finds a metric value with specific labels +func findMetricWithLabels(metrics map[string]float64, prefix string, labels map[string]string) float64 { + for k, v := range metrics { + if strings.HasPrefix(k, prefix) && hasLabels(k, labels) { + return v + } + } + return 0 +} + +func fetchRecentQueries(spammerURL string) []RecentQuery { + // Convert metrics URL to recent URL (replace /metrics with /recent) + recentURL := strings.Replace(spammerURL, "/metrics", "/recent", 1) + + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get(recentURL) + if err != nil { + return nil + } + defer resp.Body.Close() + + var queries []RecentQuery + if err := json.NewDecoder(resp.Body).Decode(&queries); err != nil { + return nil + } + return queries +} + +func fetchRecentQueriesFromAll(spammerURLs []string) []RecentQuery { + var allQueries []RecentQuery + for _, url := range spammerURLs { + queries := fetchRecentQueries(url) + allQueries = append(allQueries, queries...) + } + return allQueries +} diff --git a/op-interop-filter/cmd/main.go b/op-interop-filter/cmd/main.go new file mode 100644 index 00000000000..2286a589c94 --- /dev/null +++ b/op-interop-filter/cmd/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "os" + + "github.com/ethereum/go-ethereum/log" + "github.com/urfave/cli/v2" + + opservice "github.com/ethereum-optimism/optimism/op-service" + "github.com/ethereum-optimism/optimism/op-service/cliapp" + "github.com/ethereum-optimism/optimism/op-service/ctxinterrupt" + oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum-optimism/optimism/op-service/metrics/doc" + + "github.com/ethereum-optimism/optimism/op-interop-filter/filter" + "github.com/ethereum-optimism/optimism/op-interop-filter/flags" + "github.com/ethereum-optimism/optimism/op-interop-filter/metrics" +) + +var ( + Version = "v0.0.0" + GitCommit = "" + GitDate = "" +) + +func main() { + oplog.SetupDefaults() + + app := cli.NewApp() + app.Flags = cliapp.ProtectFlags(flags.Flags) + app.Version = opservice.FormatVersion(Version, GitCommit, GitDate, "") + app.Name = "op-interop-filter" + app.Usage = "Interop transaction filter service" + app.Description = "Validates interop executing messages for op-geth transaction filtering" + app.Action = cliapp.LifecycleCmd(filter.Main(app.Version)) + app.Commands = []*cli.Command{ + { + Name: "doc", + Subcommands: doc.NewSubcommands(metrics.NewMetrics("default")), + }, + } + + ctx := ctxinterrupt.WithSignalWaiterMain(context.Background()) + err := app.RunContext(ctx, os.Args) + if err != nil { + log.Crit("Application failed", "message", err) + } +} diff --git a/op-interop-filter/cmd/spammer/main.go b/op-interop-filter/cmd/spammer/main.go new file mode 100644 index 00000000000..b2b59def862 --- /dev/null +++ b/op-interop-filter/cmd/spammer/main.go @@ -0,0 +1,671 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + gethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/urfave/cli/v2" + + opservice "github.com/ethereum-optimism/optimism/op-service" + "github.com/ethereum-optimism/optimism/op-service/cliapp" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/ctxinterrupt" + "github.com/ethereum-optimism/optimism/op-service/eth" + oplog "github.com/ethereum-optimism/optimism/op-service/log" + "github.com/ethereum-optimism/optimism/op-service/sources" + suptypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +var ( + Version = "v0.0.1" + GitCommit = "" + GitDate = "" +) + +var ( + L2RPCFlag = &cli.StringFlag{ + Name: "l2-rpc", + Usage: "L2 RPC endpoint URL", + Required: true, + EnvVars: []string{"L2_RPC"}, + } + FilterRPCFlag = &cli.StringFlag{ + Name: "filter-rpc", + Usage: "Filter service RPC endpoint URL", + Required: true, + EnvVars: []string{"FILTER_RPC"}, + } + ChainIDFlag = &cli.Uint64Flag{ + Name: "chain-id", + Usage: "Chain ID to test", + Required: true, + EnvVars: []string{"CHAIN_ID"}, + } + NumQueriesFlag = &cli.IntFlag{ + Name: "num-queries", + Usage: "Number of queries to run (0 = run forever)", + Value: 100, + EnvVars: []string{"NUM_QUERIES"}, + } + BlockRangeFlag = &cli.IntFlag{ + Name: "block-range", + Usage: "Number of recent blocks to sample from", + Value: 100, + EnvVars: []string{"BLOCK_RANGE"}, + } + QueryIntervalFlag = &cli.StringFlag{ + Name: "query-interval", + Usage: "Interval between queries (e.g., 100ms, 1s)", + Value: "500ms", + EnvVars: []string{"QUERY_INTERVAL"}, + } + MetricsPortFlag = &cli.IntFlag{ + Name: "metrics.port", + Usage: "Port for Prometheus metrics server", + Value: 7301, + EnvVars: []string{"METRICS_PORT"}, + } + MetricsEnabledFlag = &cli.BoolFlag{ + Name: "metrics.enabled", + Usage: "Enable Prometheus metrics", + Value: true, + EnvVars: []string{"METRICS_ENABLED"}, + } +) + +// Metrics for the spammer +type SpammerMetrics struct { + queriesTotal *prometheus.CounterVec + queryLatency *prometheus.HistogramVec + errorsTotal prometheus.Counter + uptime prometheus.Gauge + currentBlockRange prometheus.Gauge +} + +// RecentQuery stores info about a recent query for debugging +type RecentQuery struct { + Timestamp string `json:"timestamp"` + ChainID eth.ChainID `json:"chain_id"` + BlockNumber uint64 `json:"block_number"` + TxHash string `json:"tx_hash"` + LogIndex uint `json:"log_index"` + Address string `json:"address"` + QueryType string `json:"query_type"` // "valid" or "invalid" + Result string `json:"result"` // "accepted" or "rejected" +} + +// RecentQueries is a thread-safe ring buffer of recent queries +type RecentQueries struct { + mu sync.RWMutex + queries []RecentQuery + maxSize int +} + +func NewRecentQueries(size int) *RecentQueries { + return &RecentQueries{ + queries: make([]RecentQuery, 0, size), + maxSize: size, + } +} + +func (r *RecentQueries) Add(q RecentQuery) { + r.mu.Lock() + defer r.mu.Unlock() + if len(r.queries) >= r.maxSize { + // Remove oldest + r.queries = r.queries[1:] + } + r.queries = append(r.queries, q) +} + +func (r *RecentQueries) Get() []RecentQuery { + r.mu.RLock() + defer r.mu.RUnlock() + // Return in reverse order (newest first) + result := make([]RecentQuery, len(r.queries)) + for i, q := range r.queries { + result[len(r.queries)-1-i] = q + } + return result +} + +func newSpammerMetrics(reg prometheus.Registerer) *SpammerMetrics { + m := &SpammerMetrics{ + queriesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "filter_spammer", + Name: "queries_total", + Help: "Total number of queries sent", + }, []string{"type", "result"}), + queryLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "filter_spammer", + Name: "query_latency_seconds", + Help: "Latency of filter queries", + Buckets: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5}, + }, []string{"type"}), + errorsTotal: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "filter_spammer", + Name: "errors_total", + Help: "Total number of unexpected errors", + }), + uptime: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "filter_spammer", + Name: "up", + Help: "1 if spammer is running", + }), + currentBlockRange: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "filter_spammer", + Name: "block_range", + Help: "Number of blocks in sample range", + }), + } + + reg.MustRegister(m.queriesTotal) + reg.MustRegister(m.queryLatency) + reg.MustRegister(m.errorsTotal) + reg.MustRegister(m.uptime) + reg.MustRegister(m.currentBlockRange) + + return m +} + +func main() { + oplog.SetupDefaults() + + app := cli.NewApp() + app.Name = "filter-spammer" + app.Usage = "Spam the interop filter service with queries to validate its behavior" + app.Version = opservice.FormatVersion(Version, GitCommit, GitDate, "") + app.Flags = cliapp.ProtectFlags(append([]cli.Flag{ + L2RPCFlag, + FilterRPCFlag, + ChainIDFlag, + NumQueriesFlag, + BlockRangeFlag, + QueryIntervalFlag, + MetricsPortFlag, + MetricsEnabledFlag, + }, oplog.CLIFlags("SPAMMER")...)) + app.Action = run + + ctx := ctxinterrupt.WithSignalWaiterMain(context.Background()) + err := app.RunContext(ctx, os.Args) + if err != nil { + log.Crit("Application failed", "err", err) + } +} + +func run(cliCtx *cli.Context) error { + logger := oplog.NewLogger(os.Stdout, oplog.ReadCLIConfig(cliCtx)) + + l2RPC := cliCtx.String(L2RPCFlag.Name) + filterRPC := cliCtx.String(FilterRPCFlag.Name) + chainID := cliCtx.Uint64(ChainIDFlag.Name) + numQueries := cliCtx.Int(NumQueriesFlag.Name) + blockRange := cliCtx.Int(BlockRangeFlag.Name) + queryIntervalStr := cliCtx.String(QueryIntervalFlag.Name) + metricsPort := cliCtx.Int(MetricsPortFlag.Name) + metricsEnabled := cliCtx.Bool(MetricsEnabledFlag.Name) + + queryInterval, err := time.ParseDuration(queryIntervalStr) + if err != nil { + return fmt.Errorf("invalid query-interval: %w", err) + } + + ctx := cliCtx.Context + + // Setup metrics and recent queries tracking + var metrics *SpammerMetrics + recentQueries := NewRecentQueries(10) // Track last 10 queries + + if metricsEnabled { + reg := prometheus.NewRegistry() + metrics = newSpammerMetrics(reg) + metrics.uptime.Set(1) + metrics.currentBlockRange.Set(float64(blockRange)) + + // Start metrics server with /recent endpoint + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) + mux.HandleFunc("/recent", func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(recentQueries.Get()) + }) + server := &http.Server{ + Addr: fmt.Sprintf(":%d", metricsPort), + Handler: mux, + } + go func() { + logger.Info("Starting metrics server", "port", metricsPort) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("Metrics server error", "err", err) + } + }() + defer func() { _ = server.Shutdown(context.Background()) }() + } + + logger.Info("Starting filter spammer", + "l2RPC", l2RPC, + "filterRPC", filterRPC, + "chainID", chainID, + "numQueries", numQueries, + "blockRange", blockRange, + "queryInterval", queryInterval, + "metricsEnabled", metricsEnabled, + "metricsPort", metricsPort, + ) + + // Connect to L2 RPC + l2Client, err := client.NewRPC(ctx, logger, l2RPC) + if err != nil { + return fmt.Errorf("failed to connect to L2 RPC: %w", err) + } + defer l2Client.Close() + + ethClient, err := sources.NewEthClient(l2Client, logger, nil, &sources.EthClientConfig{ + MaxRequestsPerBatch: 20, + MaxConcurrentRequests: 10, + TrustRPC: true, + MustBePostMerge: false, + RPCProviderKind: sources.RPCKindBasic, + ReceiptsCacheSize: 100, + TransactionsCacheSize: 100, + HeadersCacheSize: 100, + PayloadsCacheSize: 100, + BlockRefsCacheSize: 100, + }) + if err != nil { + return fmt.Errorf("failed to create eth client: %w", err) + } + defer ethClient.Close() + + // Connect to filter RPC + filterClient, err := rpc.DialContext(ctx, filterRPC) + if err != nil { + return fmt.Errorf("failed to connect to filter RPC: %w", err) + } + defer filterClient.Close() + + // Check filter is ready (not in failsafe) + var failsafe bool + if err := filterClient.CallContext(ctx, &failsafe, "admin_getFailsafeEnabled"); err != nil { + return fmt.Errorf("failed to check filter failsafe: %w", err) + } + if failsafe { + return errors.New("filter service is in failsafe mode") + } + logger.Info("Filter service responding", "failsafe", failsafe) + + // Wait for backfill to complete by doing a test query + // The filter returns "service not ready, backfill in progress" until ready + logger.Info("Waiting for filter backfill to complete...") + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Try a dummy checkAccessList call with empty access list + // If backfill is in progress, it will return "backfill in progress" + // If ready, it will succeed (empty list = valid) + var result any + err := filterClient.CallContext(ctx, &result, "supervisor_checkAccessList", []any{}, "finalized", map[string]any{ + "chainID": fmt.Sprintf("0x%x", chainID), + "timestamp": "0x0", + "blockNum": "0x0", + }) + if err == nil { + logger.Info("Filter backfill complete, ready to spam") + break + } + errStr := err.Error() + if strings.Contains(errStr, "backfill") || strings.Contains(errStr, "uninitialized") { + logger.Debug("Backfill still in progress, waiting...", "err", errStr) + time.Sleep(2 * time.Second) + continue + } + // Other errors mean backfill is done but query failed for other reason + logger.Info("Filter backfill complete, ready to spam", "testErr", errStr) + break + } + + // Get current head + head, err := ethClient.InfoByLabel(ctx, eth.Unsafe) + if err != nil { + return fmt.Errorf("failed to get head: %w", err) + } + headNum := head.NumberU64() + logger.Info("Current L2 head", "block", headNum) + + // Calculate block range to sample from + startBlock := headNum - uint64(blockRange) + if startBlock < 1 { + startBlock = 1 + } + + spammer := &Spammer{ + logger: logger, + ethClient: ethClient, + filterClient: filterClient, + chainID: eth.ChainIDFromUInt64(chainID), + startBlock: startBlock, + endBlock: headNum, + metrics: metrics, + recentQueries: recentQueries, + } + + // Run queries + ticker := time.NewTicker(queryInterval) + defer ticker.Stop() + + validQueries := 0 + invalidQueries := 0 + errorCount := 0 + + for i := 0; numQueries == 0 || i < numQueries; i++ { + select { + case <-ctx.Done(): + logger.Info("Shutting down", "validQueries", validQueries, "invalidQueries", invalidQueries, "errors", errorCount) + return nil + case <-ticker.C: + // Alternate between valid and invalid queries + if i%2 == 0 { + if err := spammer.RunValidQuery(ctx); err != nil { + logger.Error("Valid query failed unexpectedly", "err", err, "query", i) + errorCount++ + if metrics != nil { + metrics.errorsTotal.Inc() + } + if errorCount > 10 { + return fmt.Errorf("too many errors (%d): last error: %w", errorCount, err) + } + } else { + validQueries++ + logger.Debug("Valid query passed", "query", i) + } + } else { + if err := spammer.RunInvalidQuery(ctx); err != nil { + logger.Error("Invalid query test failed", "err", err, "query", i) + errorCount++ + if metrics != nil { + metrics.errorsTotal.Inc() + } + if errorCount > 10 { + return fmt.Errorf("too many errors (%d): last error: %w", errorCount, err) + } + } else { + invalidQueries++ + logger.Debug("Invalid query rejected as expected", "query", i) + } + } + + if (i+1)%20 == 0 { + logger.Info("Progress", "queries", i+1, "valid", validQueries, "invalid", invalidQueries, "errors", errorCount) + } + } + } + + logger.Info("Spammer completed successfully", + "validQueries", validQueries, + "invalidQueries", invalidQueries, + "errors", errorCount, + ) + + if errorCount > 0 { + return fmt.Errorf("completed with %d errors", errorCount) + } + return nil +} + +// Spammer handles the spam testing logic +type Spammer struct { + logger log.Logger + ethClient *sources.EthClient + filterClient *rpc.Client + chainID eth.ChainID + startBlock uint64 + endBlock uint64 + metrics *SpammerMetrics + recentQueries *RecentQueries +} + +// RunValidQuery fetches a random log and verifies the filter accepts it +func (s *Spammer) RunValidQuery(ctx context.Context) error { + start := time.Now() + + // Pick a random block + blockNum := s.startBlock + uint64(rand.Int63n(int64(s.endBlock-s.startBlock+1))) + + // Get block info + block, err := s.ethClient.InfoByNumber(ctx, blockNum) + if err != nil { + return fmt.Errorf("failed to get block %d: %w", blockNum, err) + } + + // Get receipts + _, receipts, err := s.ethClient.FetchReceipts(ctx, block.Hash()) + if err != nil { + return fmt.Errorf("failed to get receipts for block %d: %w", blockNum, err) + } + + // Find a block with logs + var foundLog bool + for _, receipt := range receipts { + if len(receipt.Logs) == 0 { + continue + } + + // Pick a random log from this receipt + logIdx := rand.Intn(len(receipt.Logs)) + log := receipt.Logs[logIdx] + + // Compute the log hash + logHash := LogToLogHash(log) + + // Create access entry + access := suptypes.ChecksumArgs{ + BlockNumber: blockNum, + LogIndex: uint32(log.Index), + Timestamp: block.Time(), + ChainID: s.chainID, + LogHash: logHash, + }.Access() + + // Encode and query + entries := suptypes.EncodeAccessList([]suptypes.Access{access}) + + execDesc := suptypes.ExecutingDescriptor{ + ChainID: s.chainID, + Timestamp: block.Time(), + } + + err := s.filterClient.CallContext(ctx, nil, "supervisor_checkAccessList", entries, suptypes.LocalUnsafe, execDesc) + + // Record metrics + if s.metrics != nil { + s.metrics.queryLatency.WithLabelValues("valid").Observe(time.Since(start).Seconds()) + } + + // Determine result + result := "accepted" + if err != nil { + result = "rejected" + if s.metrics != nil { + s.metrics.queriesTotal.WithLabelValues("valid", "rejected").Inc() + } + } else { + if s.metrics != nil { + s.metrics.queriesTotal.WithLabelValues("valid", "accepted").Inc() + } + } + + // Record to recent queries + if s.recentQueries != nil { + s.recentQueries.Add(RecentQuery{ + Timestamp: time.Now().Format("15:04:05"), + ChainID: s.chainID, + BlockNumber: blockNum, + TxHash: receipt.TxHash.Hex(), + LogIndex: log.Index, + Address: log.Address.Hex(), + QueryType: "valid", + Result: result, + }) + } + + if err != nil { + // Check if this is a "skipped data" error (block outside filter's range) + // This means the block is too old for the filter to search, not a real error + if strings.Contains(err.Error(), "skipped data") { + s.logger.Debug("Block outside filter's searchable range, retrying with different block", + "block", blockNum, "err", err) + return s.RunValidQuery(ctx) // Try again with a different block + } + return fmt.Errorf("valid query rejected: block=%d logIdx=%d err=%w", blockNum, log.Index, err) + } + + foundLog = true + break + } + + if !foundLog { + // Block had no logs, try again with a different block + return s.RunValidQuery(ctx) + } + + return nil +} + +// RunInvalidQuery creates an invalid query and verifies the filter rejects it +func (s *Spammer) RunInvalidQuery(ctx context.Context) error { + start := time.Now() + + // Pick a random block + blockNum := s.startBlock + uint64(rand.Int63n(int64(s.endBlock-s.startBlock+1))) + + // Get block info + block, err := s.ethClient.InfoByNumber(ctx, blockNum) + if err != nil { + return fmt.Errorf("failed to get block %d: %w", blockNum, err) + } + + // Get receipts + _, receipts, err := s.ethClient.FetchReceipts(ctx, block.Hash()) + if err != nil { + return fmt.Errorf("failed to get receipts for block %d: %w", blockNum, err) + } + + // Find a block with logs + var foundLog bool + for _, receipt := range receipts { + if len(receipt.Logs) == 0 { + continue + } + + // Pick a random log from this receipt + logIdx := rand.Intn(len(receipt.Logs)) + log := receipt.Logs[logIdx] + + // Compute the log hash + logHash := LogToLogHash(log) + + // Create access entry with WRONG checksum (flip a byte) + access := suptypes.ChecksumArgs{ + BlockNumber: blockNum, + LogIndex: uint32(log.Index), + Timestamp: block.Time(), + ChainID: s.chainID, + LogHash: logHash, + }.Access() + + // Corrupt the checksum (flip byte at index 10, preserving prefix byte at 0) + access.Checksum[10] ^= 0xFF + + // Encode and query + entries := suptypes.EncodeAccessList([]suptypes.Access{access}) + + execDesc := suptypes.ExecutingDescriptor{ + ChainID: s.chainID, + Timestamp: block.Time(), + } + + err := s.filterClient.CallContext(ctx, nil, "supervisor_checkAccessList", entries, suptypes.LocalUnsafe, execDesc) + + // Record metrics + if s.metrics != nil { + s.metrics.queryLatency.WithLabelValues("invalid").Observe(time.Since(start).Seconds()) + } + + // Determine result + result := "rejected" + if err == nil { + result = "accepted" + if s.metrics != nil { + s.metrics.queriesTotal.WithLabelValues("invalid", "accepted").Inc() + } + } else { + // Error expected - query was correctly rejected + if s.metrics != nil { + s.metrics.queriesTotal.WithLabelValues("invalid", "rejected").Inc() + } + } + + // Record to recent queries + if s.recentQueries != nil { + s.recentQueries.Add(RecentQuery{ + Timestamp: time.Now().Format("15:04:05"), + ChainID: s.chainID, + BlockNumber: blockNum, + TxHash: receipt.TxHash.Hex(), + LogIndex: log.Index, + Address: log.Address.Hex(), + QueryType: "invalid", + Result: result, + }) + } + + if err == nil { + return fmt.Errorf("invalid query was accepted: block=%d logIdx=%d", blockNum, log.Index) + } + + foundLog = true + break + } + + if !foundLog { + // Block had no logs, try again with a different block + return s.RunInvalidQuery(ctx) + } + + return nil +} + +// LogToLogHash computes the log hash used in LogsDB +// This matches processors.LogToLogHash +func LogToLogHash(l *gethtypes.Log) common.Hash { + // Compute payload hash from topics and data + msg := make([]byte, 0) + for _, topic := range l.Topics { + msg = append(msg, topic.Bytes()...) + } + msg = append(msg, l.Data...) + payloadHash := crypto.Keccak256Hash(msg) + + // Compute log hash + return suptypes.PayloadHashToLogHash(payloadHash, l.Address) +} diff --git a/op-interop-filter/filter/backend.go b/op-interop-filter/filter/backend.go new file mode 100644 index 00000000000..f108387e534 --- /dev/null +++ b/op-interop-filter/filter/backend.go @@ -0,0 +1,176 @@ +package filter + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum-optimism/optimism/op-interop-filter/metrics" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +var ( + ErrUnknownChain = errors.New("unknown chain") +) + +// Backend coordinates chain ingesters and handles the failsafe state +type Backend struct { + log log.Logger + metrics metrics.Metricer + cfg *Config + + chains map[eth.ChainID]*ChainIngester + chainsMu sync.RWMutex + + failsafe atomic.Bool +} + +// NewBackend creates a new Backend instance +func NewBackend(ctx context.Context, logger log.Logger, m metrics.Metricer, cfg *Config) (*Backend, error) { + b := &Backend{ + log: logger, + metrics: m, + cfg: cfg, + chains: make(map[eth.ChainID]*ChainIngester), + } + + // Create chain instances (but don't start them yet) + for _, rpcURL := range cfg.L2RPCs { + // Query chain ID from RPC + client, err := ethclient.DialContext(ctx, rpcURL) + if err != nil { + return nil, fmt.Errorf("failed to dial RPC %s: %w", rpcURL, err) + } + chainIDBig, err := client.ChainID(ctx) + client.Close() + if err != nil { + return nil, fmt.Errorf("failed to get chain ID from %s: %w", rpcURL, err) + } + chainIDUint := chainIDBig.Uint64() + chainID := eth.ChainIDFromUInt64(chainIDUint) + + chain, err := NewChainIngester(ctx, logger, m, chainID, rpcURL, cfg) + if err != nil { + return nil, fmt.Errorf("failed to create chain %d: %w", chainIDUint, err) + } + b.chains[chainID] = chain + logger.Info("Created chain ingester", "chainID", chainIDUint, "rpc", rpcURL) + } + + return b, nil +} + +// Start starts all chain ingesters +func (b *Backend) Start(ctx context.Context) error { + b.log.Info("Starting backend", "chains", len(b.chains)) + for chainID, chain := range b.chains { + if err := chain.Start(ctx, b.onReorg); err != nil { + return fmt.Errorf("failed to start chain %s: %w", chainID, err) + } + } + return nil +} + +// Stop stops all chain ingesters +func (b *Backend) Stop(ctx context.Context) error { + b.log.Info("Stopping backend") + var result error + for chainID, chain := range b.chains { + if err := chain.Stop(ctx); err != nil { + result = errors.Join(result, fmt.Errorf("failed to stop chain %s: %w", chainID, err)) + } + } + return result +} + +// onReorg is called when a chain detects a reorg +func (b *Backend) onReorg(chainID eth.ChainID, err error) { + b.log.Error("Reorg detected, enabling failsafe", "chainID", chainID, "err", err) + b.failsafe.Store(true) + b.metrics.RecordFailsafeEnabled(true) + chainIDUint, _ := chainID.Uint64() + b.metrics.RecordReorgDetected(chainIDUint) +} + +// FailsafeEnabled returns whether failsafe is enabled +func (b *Backend) FailsafeEnabled() bool { + return b.failsafe.Load() +} + +// Ready returns whether all chains have finished backfill +func (b *Backend) Ready() bool { + b.chainsMu.RLock() + defer b.chainsMu.RUnlock() + for _, chain := range b.chains { + if !chain.Ready() { + return false + } + } + return true +} + +// CheckAccessList validates the given access list entries +func (b *Backend) CheckAccessList(ctx context.Context, inboxEntries []common.Hash, + minSafety types.SafetyLevel, execDescriptor types.ExecutingDescriptor) error { + + // Check failsafe first + if b.FailsafeEnabled() { + b.metrics.RecordCheckAccessList(false) + return types.ErrFailsafeEnabled + } + + // Check if we're ready (all chains backfilled) + if !b.Ready() { + b.metrics.RecordCheckAccessList(false) + return types.ErrUninitialized + } + + // Only support "unsafe" safety level - we don't track safety levels + if minSafety != types.LocalUnsafe { + b.metrics.RecordCheckAccessList(false) + return fmt.Errorf("unsupported safety level %q: only %q is supported", minSafety, types.LocalUnsafe) + } + + // Parse and validate each access entry + entries := inboxEntries + for len(entries) > 0 { + if err := ctx.Err(); err != nil { + b.metrics.RecordCheckAccessList(false) + return fmt.Errorf("context cancelled: %w", err) + } + + remaining, access, err := types.ParseAccess(entries) + if err != nil { + b.metrics.RecordCheckAccessList(false) + return fmt.Errorf("failed to parse access: %w", err) + } + entries = remaining + + // Get chain for this access entry + b.chainsMu.RLock() + chain, ok := b.chains[access.ChainID] + b.chainsMu.RUnlock() + if !ok { + b.metrics.RecordCheckAccessList(false) + return fmt.Errorf("%w: %s", ErrUnknownChain, access.ChainID) + } + + // Validate via LogsDB - propagate actual error from LogsDB + if err := chain.Contains(access); err != nil { + b.log.Debug("Access validation failed", "chainID", access.ChainID, + "blockNum", access.BlockNumber, "logIdx", access.LogIndex, "err", err) + b.metrics.RecordCheckAccessList(false) + return err + } + } + + b.metrics.RecordCheckAccessList(true) + return nil +} diff --git a/op-interop-filter/filter/backend_test.go b/op-interop-filter/filter/backend_test.go new file mode 100644 index 00000000000..249a9df298d --- /dev/null +++ b/op-interop-filter/filter/backend_test.go @@ -0,0 +1,202 @@ +package filter + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/require" + + "github.com/ethereum-optimism/optimism/op-interop-filter/metrics" + "github.com/ethereum-optimism/optimism/op-service/eth" + suptypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +func TestBackend_FailsafeEnabled(t *testing.T) { + logger := log.New() + m := metrics.NoopMetrics + cfg := &Config{} + + b := &Backend{ + log: logger, + metrics: m, + cfg: cfg, + chains: make(map[eth.ChainID]*ChainIngester), + } + + // Initially failsafe should be disabled + require.False(t, b.FailsafeEnabled()) + + // Simulate reorg detection + chainID := eth.ChainIDFromUInt64(10) + b.onReorg(chainID, nil) + + // Now failsafe should be enabled + require.True(t, b.FailsafeEnabled()) +} + +func TestBackend_CheckAccessList_FailsafeEnabled(t *testing.T) { + logger := log.New() + m := metrics.NoopMetrics + cfg := &Config{} + + b := &Backend{ + log: logger, + metrics: m, + cfg: cfg, + chains: make(map[eth.ChainID]*ChainIngester), + } + + // Enable failsafe + b.failsafe.Store(true) + + // CheckAccessList should return ErrFailsafeEnabled + err := b.CheckAccessList(context.Background(), nil, suptypes.LocalUnsafe, suptypes.ExecutingDescriptor{}) + require.ErrorIs(t, err, suptypes.ErrFailsafeEnabled) +} + +func TestBackend_CheckAccessList_NotReady(t *testing.T) { + logger := log.New() + m := metrics.NoopMetrics + cfg := &Config{} + + chainID := eth.ChainIDFromUInt64(10) + chain := &ChainIngester{ + log: logger, + chainID: chainID, + } + // Chain is not ready (ready is false by default) + + b := &Backend{ + log: logger, + metrics: m, + cfg: cfg, + chains: map[eth.ChainID]*ChainIngester{chainID: chain}, + } + + // CheckAccessList should return ErrUninitialized when chain is not ready + err := b.CheckAccessList(context.Background(), nil, suptypes.LocalUnsafe, suptypes.ExecutingDescriptor{}) + require.ErrorIs(t, err, suptypes.ErrUninitialized) +} + +func TestBackend_CheckAccessList_UnknownChain(t *testing.T) { + logger := log.New() + m := metrics.NoopMetrics + cfg := &Config{} + + // Create a ready chain for chainID 10 + chainID := eth.ChainIDFromUInt64(10) + chain := &ChainIngester{ + log: logger, + chainID: chainID, + } + chain.ready.Store(true) + + b := &Backend{ + log: logger, + metrics: m, + cfg: cfg, + chains: map[eth.ChainID]*ChainIngester{chainID: chain}, + } + + // Create access entries for an unknown chain (chainID 999) + // Using raw encoding for a simple access entry + unknownChainAccess := createMockAccessEntry(999, 100, 0, 12345, common.Hash{}) + + err := b.CheckAccessList(context.Background(), unknownChainAccess, suptypes.LocalUnsafe, suptypes.ExecutingDescriptor{}) + require.ErrorIs(t, err, ErrUnknownChain) +} + +func TestBackend_Ready(t *testing.T) { + logger := log.New() + m := metrics.NoopMetrics + cfg := &Config{} + + chain1 := &ChainIngester{log: logger, chainID: eth.ChainIDFromUInt64(10)} + chain2 := &ChainIngester{log: logger, chainID: eth.ChainIDFromUInt64(8453)} + + b := &Backend{ + log: logger, + metrics: m, + cfg: cfg, + chains: map[eth.ChainID]*ChainIngester{ + eth.ChainIDFromUInt64(10): chain1, + eth.ChainIDFromUInt64(8453): chain2, + }, + } + + // No chains ready + require.False(t, b.Ready()) + + // One chain ready + chain1.ready.Store(true) + require.False(t, b.Ready()) + + // Both chains ready + chain2.ready.Store(true) + require.True(t, b.Ready()) +} + +// createMockAccessEntry creates a mock access entry for testing +// This follows the encoding format from supervisor types: +// Hash 0: byte 0 = PrefixLookup (1), bytes 1-3 = zeros, bytes 4-12 = chainID, +// +// bytes 12-20 = blockNum, bytes 20-28 = timestamp, bytes 28-32 = logIdx +// +// Hash 1: byte 0 = PrefixChecksum (3), rest = checksum data +func createMockAccessEntry(chainID uint64, blockNum uint64, logIdx uint32, timestamp uint64, checksum common.Hash) []common.Hash { + const ( + PrefixLookup = 1 + PrefixChecksum = 3 + ) + + var entries []common.Hash + + // First hash: lookup entry + var lookup common.Hash + lookup[0] = PrefixLookup + // bytes 1-3 are zero padding (already zero) + // bytes 4-12: chainID (big endian) + lookup[4] = byte(chainID >> 56) + lookup[5] = byte(chainID >> 48) + lookup[6] = byte(chainID >> 40) + lookup[7] = byte(chainID >> 32) + lookup[8] = byte(chainID >> 24) + lookup[9] = byte(chainID >> 16) + lookup[10] = byte(chainID >> 8) + lookup[11] = byte(chainID) + // bytes 12-20: blockNum (big endian) + lookup[12] = byte(blockNum >> 56) + lookup[13] = byte(blockNum >> 48) + lookup[14] = byte(blockNum >> 40) + lookup[15] = byte(blockNum >> 32) + lookup[16] = byte(blockNum >> 24) + lookup[17] = byte(blockNum >> 16) + lookup[18] = byte(blockNum >> 8) + lookup[19] = byte(blockNum) + // bytes 20-28: timestamp (big endian) + lookup[20] = byte(timestamp >> 56) + lookup[21] = byte(timestamp >> 48) + lookup[22] = byte(timestamp >> 40) + lookup[23] = byte(timestamp >> 32) + lookup[24] = byte(timestamp >> 24) + lookup[25] = byte(timestamp >> 16) + lookup[26] = byte(timestamp >> 8) + lookup[27] = byte(timestamp) + // bytes 28-32: logIdx (big endian uint32) + lookup[28] = byte(logIdx >> 24) + lookup[29] = byte(logIdx >> 16) + lookup[30] = byte(logIdx >> 8) + lookup[31] = byte(logIdx) + entries = append(entries, lookup) + + // Second hash: checksum entry + var checksumEntry common.Hash + checksumEntry[0] = PrefixChecksum + // Copy checksum data to bytes 1-31 + copy(checksumEntry[1:], checksum[1:]) + entries = append(entries, checksumEntry) + + return entries +} diff --git a/op-interop-filter/filter/chain_ingester.go b/op-interop-filter/filter/chain_ingester.go new file mode 100644 index 00000000000..0e2fb6d3154 --- /dev/null +++ b/op-interop-filter/filter/chain_ingester.go @@ -0,0 +1,435 @@ +package filter + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + gethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + + "github.com/ethereum-optimism/optimism/op-interop-filter/metrics" + "github.com/ethereum-optimism/optimism/op-service/client" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/sources" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/db/logs" + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/processors" + suptypes "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +const ( + // defaultBlockTime is used to estimate blocks per hour if we can't determine it + defaultBlockTime = 2 * time.Second + // httpPollInterval for polling new blocks + httpPollInterval = 2 * time.Second +) + +// ReorgCallback is called when a reorg is detected +type ReorgCallback func(chainID eth.ChainID, err error) + +// ChainIngester manages block ingestion and LogsDB for a single chain +type ChainIngester struct { + log log.Logger + metrics metrics.Metricer + chainID eth.ChainID + rpcURL string + cfg *Config + + rpcClient client.RPC + ethClient *sources.EthClient + logsDB *logs.DB + + ready atomic.Bool + stopped atomic.Bool + + cancel context.CancelFunc + wg sync.WaitGroup +} + +// NewChainIngester creates a new ChainIngester instance +func NewChainIngester(ctx context.Context, logger log.Logger, m metrics.Metricer, + chainID eth.ChainID, rpcURL string, cfg *Config) (*ChainIngester, error) { + + c := &ChainIngester{ + log: logger.New("chainID", chainID), + metrics: m, + chainID: chainID, + rpcURL: rpcURL, + cfg: cfg, + } + return c, nil +} + +// Start starts the chain ingester +func (c *ChainIngester) Start(ctx context.Context, onReorg ReorgCallback) error { + c.log.Info("Starting chain ingester") + + // Create RPC client + rpcClient, err := client.NewRPC(ctx, c.log, c.rpcURL, + client.WithHttpPollInterval(httpPollInterval)) + if err != nil { + return fmt.Errorf("failed to create RPC client: %w", err) + } + c.rpcClient = rpcClient + + // Create eth client for fetching blocks/receipts + ethClient, err := sources.NewEthClient( + rpcClient, + c.log, + nil, // no metrics for now + &sources.EthClientConfig{ + MaxRequestsPerBatch: 20, + MaxConcurrentRequests: 10, + TrustRPC: true, + MustBePostMerge: false, + RPCProviderKind: sources.RPCKindBasic, + ReceiptsCacheSize: 100, + TransactionsCacheSize: 100, + HeadersCacheSize: 100, + PayloadsCacheSize: 100, + BlockRefsCacheSize: 100, + }, + ) + if err != nil { + return fmt.Errorf("failed to create eth client: %w", err) + } + c.ethClient = ethClient + + // Initialize LogsDB + if err := c.initLogsDB(); err != nil { + return fmt.Errorf("failed to init LogsDB: %w", err) + } + + // Start ingestion in background + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + + c.wg.Add(1) + go func() { + defer c.wg.Done() + c.runIngestion(ctx, onReorg) + }() + + return nil +} + +// Stop stops the chain ingester +func (c *ChainIngester) Stop(ctx context.Context) error { + if !c.stopped.CompareAndSwap(false, true) { + return nil + } + c.log.Info("Stopping chain ingester") + if c.cancel != nil { + c.cancel() + } + c.wg.Wait() + if c.ethClient != nil { + c.ethClient.Close() + } + if c.logsDB != nil { + c.logsDB.Close() + } + return nil +} + +// Ready returns whether backfill is complete +func (c *ChainIngester) Ready() bool { + return c.ready.Load() +} + +// Contains validates that a log exists in the LogsDB +func (c *ChainIngester) Contains(access suptypes.Access) error { + query := suptypes.ContainsQuery{ + Timestamp: access.Timestamp, + BlockNum: access.BlockNumber, + LogIdx: access.LogIndex, + Checksum: access.Checksum, + } + _, err := c.logsDB.Contains(query) + return err +} + +func (c *ChainIngester) initLogsDB() error { + chainIDUint, _ := c.chainID.Uint64() + + var dbPath string + if c.cfg.DataDir != "" { + chainDir := filepath.Join(c.cfg.DataDir, fmt.Sprintf("chain-%d", chainIDUint)) + if err := os.MkdirAll(chainDir, 0755); err != nil { + return fmt.Errorf("failed to create chain dir: %w", err) + } + dbPath = filepath.Join(chainDir, "logs.db") + } else { + // Use fresh temp directory if no data dir specified + // Remove any stale data from previous runs + tmpDir := filepath.Join(os.TempDir(), "op-interop-filter", fmt.Sprintf("chain-%d", chainIDUint)) + if err := os.RemoveAll(tmpDir); err != nil { + c.log.Warn("Failed to clean temp dir", "path", tmpDir, "err", err) + } + if err := os.MkdirAll(tmpDir, 0755); err != nil { + return fmt.Errorf("failed to create temp dir: %w", err) + } + dbPath = filepath.Join(tmpDir, "logs.db") + c.log.Info("Using temporary directory for LogsDB", "path", tmpDir) + } + + // Create LogsDB + dbMetrics := &logsDBMetrics{chainID: chainIDUint, m: c.metrics} + db, err := logs.NewFromFile(c.log, dbMetrics, c.chainID, dbPath, true) + if err != nil { + return err + } + c.logsDB = db + return nil +} + +func (c *ChainIngester) runIngestion(ctx context.Context, onReorg ReorgCallback) { + c.log.Info("Starting block ingestion") + + // Get current head + head, err := c.ethClient.InfoByLabel(ctx, eth.Unsafe) + if err != nil { + c.log.Error("Failed to get head block", "err", err) + onReorg(c.chainID, fmt.Errorf("failed to get head: %w", err)) + return + } + c.log.Info("Current head", "number", head.NumberU64(), "hash", head.Hash()) + + // Calculate start block (24 hours back) + startBlock := c.calculateStartBlock(head) + c.log.Info("Backfill range", "start", startBlock, "end", head.NumberU64()) + + // Initialize LogsDB with starting block + if err := c.initializeLogsDBFromBlock(ctx, startBlock); err != nil { + c.log.Error("Failed to initialize LogsDB", "err", err) + onReorg(c.chainID, fmt.Errorf("failed to initialize LogsDB: %w", err)) + return + } + + // Record the starting block + chainIDUint, _ := c.chainID.Uint64() + c.metrics.RecordLogsDBFirstBlock(chainIDUint, startBlock) + + // Backfill historical blocks + headNum := head.NumberU64() + if err := c.backfill(ctx, startBlock, headNum); err != nil { + c.log.Error("Backfill failed", "err", err) + onReorg(c.chainID, fmt.Errorf("backfill failed: %w", err)) + return + } + + c.log.Info("Backfill complete, starting live ingestion") + c.ready.Store(true) + c.metrics.RecordChainReady(chainIDUint, true) + + // Subscribe to new blocks, starting from the head we backfilled to + c.subscribeNewBlocks(ctx, onReorg, headNum) +} + +func (c *ChainIngester) calculateStartBlock(head eth.BlockInfo) uint64 { + // Estimate blocks in backfill period based on configured duration + backfillBlocks := uint64(c.cfg.BackfillDuration / defaultBlockTime) + + headNum := head.NumberU64() + if headNum <= backfillBlocks { + return 1 // Start from block 1 (not genesis) + } + return headNum - backfillBlocks +} + +func (c *ChainIngester) initializeLogsDBFromBlock(ctx context.Context, blockNum uint64) error { + // Get the block before our start block to use as the "sealed" starting point + if blockNum == 0 { + blockNum = 1 + } + startBlockNum := blockNum - 1 + + block, err := c.ethClient.InfoByNumber(ctx, startBlockNum) + if err != nil { + return fmt.Errorf("failed to get start block %d: %w", startBlockNum, err) + } + + // Seal this block as the starting point for an empty DB + // When the DB is empty, SealBlock accepts any block to initialize it + blockID := eth.BlockID{Hash: block.Hash(), Number: startBlockNum} + return c.logsDB.SealBlock(block.ParentHash(), blockID, block.Time()) +} + +func (c *ChainIngester) backfill(ctx context.Context, startBlock, endBlock uint64) error { + total := endBlock - startBlock + 1 + c.log.Info("Starting backfill", "blocks", total, "from", startBlock, "to", endBlock) + + chainIDUint, _ := c.chainID.Uint64() + + // Log every 10% or every 100 blocks, whichever is more frequent + logInterval := total / 10 + if logInterval < 100 { + logInterval = 100 + } + if logInterval > 1000 { + logInterval = 1000 + } + + lastLogTime := time.Now() + + for blockNum := startBlock; blockNum <= endBlock; blockNum++ { + if ctx.Err() != nil { + return ctx.Err() + } + + if err := c.ingestBlock(ctx, blockNum); err != nil { + return fmt.Errorf("failed to ingest block %d: %w", blockNum, err) + } + + // Update progress + progress := blockNum - startBlock + 1 + c.metrics.RecordBackfillProgress(chainIDUint, progress, total) + + // Log progress periodically (by count or time) + if progress%logInterval == 0 || time.Since(lastLogTime) > 10*time.Second { + c.log.Info("Backfill progress", + "block", blockNum, + "progress", fmt.Sprintf("%d/%d (%.1f%%)", progress, total, float64(progress)/float64(total)*100)) + lastLogTime = time.Now() + } + } + + return nil +} + +func (c *ChainIngester) subscribeNewBlocks(ctx context.Context, onReorg ReorgCallback, startBlock uint64) { + // Simple polling loop for new blocks + ticker := time.NewTicker(httpPollInterval) + defer ticker.Stop() + + lastBlock := startBlock + chainIDUint, _ := c.chainID.Uint64() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + head, err := c.ethClient.InfoByLabel(ctx, eth.Unsafe) + if err != nil { + c.log.Warn("Failed to get head", "err", err) + continue + } + + headNum := head.NumberU64() + if headNum <= lastBlock { + continue + } + + // Ingest any missed blocks + for blockNum := lastBlock + 1; blockNum <= headNum; blockNum++ { + if err := c.ingestBlock(ctx, blockNum); err != nil { + c.log.Error("Failed to ingest block", "block", blockNum, "err", err) + // Check if this is a reorg + if errors.Is(err, suptypes.ErrConflict) { + onReorg(c.chainID, err) + return + } + } + c.metrics.RecordChainHead(chainIDUint, blockNum) + } + lastBlock = headNum + } + } +} + +func (c *ChainIngester) ingestBlock(ctx context.Context, blockNum uint64) error { + // Get block info + block, err := c.ethClient.InfoByNumber(ctx, blockNum) + if err != nil { + return fmt.Errorf("failed to get block: %w", err) + } + + // Get receipts - returns (BlockInfo, Receipts, error) + _, receipts, err := c.ethClient.FetchReceipts(ctx, block.Hash()) + if err != nil { + return fmt.Errorf("failed to get receipts: %w", err) + } + + // Get parent block for parent hash + parentBlock := eth.BlockID{ + Hash: block.ParentHash(), + Number: blockNum - 1, + } + + // Add all logs from receipts + var logIdx uint32 + for _, receipt := range receipts { + for _, log := range receipt.Logs { + execMsg := c.parseExecutingMessage(log) + logHash := logToHash(log) + + if err := c.logsDB.AddLog(logHash, parentBlock, logIdx, execMsg); err != nil { + return fmt.Errorf("failed to add log %d: %w", logIdx, err) + } + logIdx++ + } + } + + // Seal the block + blockID := eth.BlockID{Hash: block.Hash(), Number: blockNum} + if err := c.logsDB.SealBlock(block.ParentHash(), blockID, block.Time()); err != nil { + return fmt.Errorf("failed to seal block: %w", err) + } + + // Record LogsDB metrics + chainIDUint, _ := c.chainID.Uint64() + c.metrics.RecordLogsDBBlocksSealed(chainIDUint) + if logIdx > 0 { + c.metrics.RecordLogsDBLogsAdded(chainIDUint, int(logIdx)) + } + + return nil +} + +// parseExecutingMessage checks if a log is an executing message and parses it +func (c *ChainIngester) parseExecutingMessage(log *gethtypes.Log) *suptypes.ExecutingMessage { + // Check if this is a CrossL2Inbox executing message event + // Address: 0x4200000000000000000000000000000000000022 + if log.Address != params.InteropCrossL2InboxAddress { + return nil + } + if len(log.Topics) == 0 || log.Topics[0] != suptypes.ExecutingMessageEventTopic { + return nil + } + + // Parse the executing message using the processors package + execMsg, err := processors.DecodeExecutingMessageLog(log) + if err != nil { + c.log.Warn("Failed to decode executing message", "err", err) + return nil + } + return execMsg +} + +// logToHash computes the log hash used in LogsDB +func logToHash(log *gethtypes.Log) common.Hash { + return processors.LogToLogHash(log) +} + +// logsDBMetrics bridges logs.Metrics interface to our metrics.Metricer +type logsDBMetrics struct { + chainID uint64 + m metrics.Metricer +} + +func (l *logsDBMetrics) RecordDBEntryCount(kind string, count int64) { + // We track total entries via our own metric + l.m.RecordLogsDBEntries(l.chainID, count) +} + +func (l *logsDBMetrics) RecordDBSearchEntriesRead(count int64) { + // This tracks search efficiency, could add later if needed +} diff --git a/op-interop-filter/filter/config.go b/op-interop-filter/filter/config.go new file mode 100644 index 00000000000..dc00b19b145 --- /dev/null +++ b/op-interop-filter/filter/config.go @@ -0,0 +1,56 @@ +package filter + +import ( + "errors" + "fmt" + "time" + + "github.com/urfave/cli/v2" + + "github.com/ethereum-optimism/optimism/op-interop-filter/flags" + oplog "github.com/ethereum-optimism/optimism/op-service/log" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/oppprof" + oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" +) + +type Config struct { + L2RPCs []string + DataDir string + BackfillDuration time.Duration + Version string + + LogConfig oplog.CLIConfig + MetricsConfig opmetrics.CLIConfig + PprofConfig oppprof.CLIConfig + RPC oprpc.CLIConfig +} + +func (c *Config) Check() error { + var result error + if len(c.L2RPCs) == 0 { + result = errors.Join(result, errors.New("at least one L2 RPC is required")) + } + result = errors.Join(result, c.MetricsConfig.Check()) + result = errors.Join(result, c.PprofConfig.Check()) + result = errors.Join(result, c.RPC.Check()) + return result +} + +func NewConfig(ctx *cli.Context, version string) (*Config, error) { + backfillDuration, err := time.ParseDuration(ctx.String(flags.BackfillDurationFlag.Name)) + if err != nil { + return nil, fmt.Errorf("invalid backfill-duration: %w", err) + } + + return &Config{ + L2RPCs: ctx.StringSlice(flags.L2RPCsFlag.Name), + DataDir: ctx.String(flags.DataDirFlag.Name), + BackfillDuration: backfillDuration, + Version: version, + LogConfig: oplog.ReadCLIConfig(ctx), + MetricsConfig: opmetrics.ReadCLIConfig(ctx), + PprofConfig: oppprof.ReadCLIConfig(ctx), + RPC: oprpc.ReadCLIConfig(ctx), + }, nil +} diff --git a/op-interop-filter/filter/frontend.go b/op-interop-filter/filter/frontend.go new file mode 100644 index 00000000000..38ce8b8f09e --- /dev/null +++ b/op-interop-filter/filter/frontend.go @@ -0,0 +1,51 @@ +package filter + +import ( + "context" + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types" +) + +// QueryFrontend handles supervisor query RPC methods +type QueryFrontend struct { + backend *Backend +} + +// CheckAccessList validates interop executing messages +func (f *QueryFrontend) CheckAccessList(ctx context.Context, inboxEntries []common.Hash, + minSafety types.SafetyLevel, executingDescriptor types.ExecutingDescriptor) error { + + err := f.backend.CheckAccessList(ctx, inboxEntries, minSafety, executingDescriptor) + if err != nil { + // Map errors to appropriate RPC error codes + code := types.GetErrorCode(err) + if code != 0 { + return &rpc.JsonError{ + Code: code, + Message: err.Error(), + } + } + // For unknown errors, return as-is (will be internal error) + return err + } + return nil +} + +// AdminFrontend handles admin RPC methods +type AdminFrontend struct { + backend *Backend +} + +// GetFailsafeEnabled returns whether failsafe is enabled +func (a *AdminFrontend) GetFailsafeEnabled(ctx context.Context) (bool, error) { + return a.backend.FailsafeEnabled(), nil +} + +// SetFailsafeEnabled is not supported - failsafe is automatically managed +func (a *AdminFrontend) SetFailsafeEnabled(ctx context.Context, enabled bool) error { + return errors.New("SetFailsafeEnabled not supported: failsafe is automatically managed based on reorg detection") +} diff --git a/op-interop-filter/filter/service.go b/op-interop-filter/filter/service.go new file mode 100644 index 00000000000..59393981888 --- /dev/null +++ b/op-interop-filter/filter/service.go @@ -0,0 +1,229 @@ +package filter + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" + "github.com/urfave/cli/v2" + + opservice "github.com/ethereum-optimism/optimism/op-service" + "github.com/ethereum-optimism/optimism/op-service/cliapp" + "github.com/ethereum-optimism/optimism/op-service/httputil" + oplog "github.com/ethereum-optimism/optimism/op-service/log" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/oppprof" + oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" + + "github.com/ethereum-optimism/optimism/op-interop-filter/flags" + "github.com/ethereum-optimism/optimism/op-interop-filter/metrics" +) + +// Service is the main op-interop-filter service +type Service struct { + log log.Logger + metrics metrics.Metricer + version string + + pprofService *oppprof.Service + metricsSrv *httputil.HTTPServer + rpcServer *oprpc.Server + + backend *Backend + + stopped atomic.Bool +} + +var _ cliapp.Lifecycle = (*Service)(nil) + +// Main returns the main entrypoint for the service +func Main(version string) cliapp.LifecycleAction { + return func(cliCtx *cli.Context, closeApp context.CancelCauseFunc) (cliapp.Lifecycle, error) { + if err := flags.CheckRequired(cliCtx); err != nil { + return nil, err + } + + cfg, err := NewConfig(cliCtx, version) + if err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + if err := cfg.Check(); err != nil { + return nil, fmt.Errorf("invalid config: %w", err) + } + + l := oplog.NewLogger(oplog.AppOut(cliCtx), cfg.LogConfig) + oplog.SetGlobalLogHandler(l.Handler()) + opservice.ValidateEnvVars(flags.EnvVarPrefix, flags.Flags, l) + + l.Info("Initializing op-interop-filter", "version", version) + return NewService(cliCtx.Context, cfg, l) + } +} + +// NewService creates a new Service instance +func NewService(ctx context.Context, cfg *Config, logger log.Logger) (*Service, error) { + s := &Service{ + log: logger, + version: cfg.Version, + } + if err := s.init(ctx, cfg); err != nil { + return nil, errors.Join(err, s.Stop(ctx)) + } + return s, nil +} + +func (s *Service) init(ctx context.Context, cfg *Config) error { + s.initMetrics(cfg) + + if err := s.initPProf(cfg); err != nil { + return fmt.Errorf("failed to init pprof: %w", err) + } + if err := s.initMetricsServer(cfg); err != nil { + return fmt.Errorf("failed to init metrics server: %w", err) + } + if err := s.initBackend(ctx, cfg); err != nil { + return fmt.Errorf("failed to init backend: %w", err) + } + if err := s.initRPCServer(cfg); err != nil { + return fmt.Errorf("failed to init RPC server: %w", err) + } + return nil +} + +func (s *Service) initMetrics(cfg *Config) { + if cfg.MetricsConfig.Enabled { + s.metrics = metrics.NewMetrics("default") + s.metrics.RecordInfo(s.version) + } else { + s.metrics = metrics.NoopMetrics + } +} + +func (s *Service) initPProf(cfg *Config) error { + s.pprofService = oppprof.New( + cfg.PprofConfig.ListenEnabled, + cfg.PprofConfig.ListenAddr, + cfg.PprofConfig.ListenPort, + cfg.PprofConfig.ProfileType, + cfg.PprofConfig.ProfileDir, + cfg.PprofConfig.ProfileFilename, + ) + if err := s.pprofService.Start(); err != nil { + return fmt.Errorf("failed to start pprof: %w", err) + } + return nil +} + +func (s *Service) initMetricsServer(cfg *Config) error { + if !cfg.MetricsConfig.Enabled { + s.log.Info("Metrics disabled") + return nil + } + m, ok := s.metrics.(opmetrics.RegistryMetricer) + if !ok { + return fmt.Errorf("metrics do not expose registry") + } + metricsSrv, err := opmetrics.StartServer(m.Registry(), cfg.MetricsConfig.ListenAddr, cfg.MetricsConfig.ListenPort) + if err != nil { + return fmt.Errorf("failed to start metrics server: %w", err) + } + s.log.Info("Started metrics server", "addr", metricsSrv.Addr()) + s.metricsSrv = metricsSrv + return nil +} + +func (s *Service) initBackend(ctx context.Context, cfg *Config) error { + backend, err := NewBackend(ctx, s.log, s.metrics, cfg) + if err != nil { + return err + } + s.backend = backend + return nil +} + +func (s *Service) initRPCServer(cfg *Config) error { + server := oprpc.NewServer( + cfg.RPC.ListenAddr, + cfg.RPC.ListenPort, + s.version, + oprpc.WithLogger(s.log), + ) + + // Register supervisor query API + server.AddAPI(rpc.API{ + Namespace: "supervisor", + Service: &QueryFrontend{backend: s.backend}, + Authenticated: false, + }) + + // Register admin API (opt-in) + if cfg.RPC.EnableAdmin { + s.log.Info("Admin RPC enabled") + server.AddAPI(rpc.API{ + Namespace: "admin", + Service: &AdminFrontend{backend: s.backend}, + Authenticated: true, + }) + } + + s.rpcServer = server + return nil +} + +// Start starts the service +func (s *Service) Start(ctx context.Context) error { + s.log.Info("Starting op-interop-filter") + + // Start backend (begins block ingestion) + if err := s.backend.Start(ctx); err != nil { + return fmt.Errorf("failed to start backend: %w", err) + } + + // Start RPC server + if err := s.rpcServer.Start(); err != nil { + return fmt.Errorf("failed to start RPC server: %w", err) + } + s.log.Info("RPC server started", "endpoint", s.rpcServer.Endpoint()) + + s.metrics.RecordUp() + return nil +} + +// Stop stops the service +func (s *Service) Stop(ctx context.Context) error { + if !s.stopped.CompareAndSwap(false, true) { + return nil + } + s.log.Info("Stopping op-interop-filter") + + var result error + if s.rpcServer != nil { + if err := s.rpcServer.Stop(); err != nil { + result = errors.Join(result, fmt.Errorf("failed to stop RPC: %w", err)) + } + } + if s.backend != nil { + if err := s.backend.Stop(ctx); err != nil { + result = errors.Join(result, fmt.Errorf("failed to stop backend: %w", err)) + } + } + if s.pprofService != nil { + if err := s.pprofService.Stop(ctx); err != nil { + result = errors.Join(result, fmt.Errorf("failed to stop pprof: %w", err)) + } + } + if s.metricsSrv != nil { + if err := s.metricsSrv.Stop(ctx); err != nil { + result = errors.Join(result, fmt.Errorf("failed to stop metrics: %w", err)) + } + } + return result +} + +// Stopped returns true if the service has been stopped +func (s *Service) Stopped() bool { + return s.stopped.Load() +} diff --git a/op-interop-filter/flags/flags.go b/op-interop-filter/flags/flags.go new file mode 100644 index 00000000000..fd998022286 --- /dev/null +++ b/op-interop-filter/flags/flags.go @@ -0,0 +1,69 @@ +package flags + +import ( + "fmt" + + "github.com/urfave/cli/v2" + + opservice "github.com/ethereum-optimism/optimism/op-service" + oplog "github.com/ethereum-optimism/optimism/op-service/log" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum-optimism/optimism/op-service/oppprof" + oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" +) + +const EnvVarPrefix = "OP_INTEROP_FILTER" + +func prefixEnvVars(name string) []string { + return opservice.PrefixEnvVar(EnvVarPrefix, name) +} + +var ( + L2RPCsFlag = &cli.StringSliceFlag{ + Name: "l2-rpcs", + Usage: "L2 RPC endpoints to connect to (chain ID is queried from each endpoint)", + EnvVars: prefixEnvVars("L2_RPCS"), + } + DataDirFlag = &cli.StringFlag{ + Name: "data-dir", + Usage: "Directory for LogsDB storage. If empty, uses in-memory storage", + EnvVars: prefixEnvVars("DATA_DIR"), + Value: "", + } + BackfillDurationFlag = &cli.StringFlag{ + Name: "backfill-duration", + Usage: "Duration to backfill on startup (e.g., 24h, 30m, 1h30m)", + EnvVars: prefixEnvVars("BACKFILL_DURATION"), + Value: "24h", + } +) + +var requiredFlags = []cli.Flag{ + L2RPCsFlag, +} + +var optionalFlags = []cli.Flag{ + DataDirFlag, + BackfillDurationFlag, +} + +func init() { + optionalFlags = append(optionalFlags, oprpc.CLIFlags(EnvVarPrefix)...) + optionalFlags = append(optionalFlags, oplog.CLIFlags(EnvVarPrefix)...) + optionalFlags = append(optionalFlags, opmetrics.CLIFlags(EnvVarPrefix)...) + optionalFlags = append(optionalFlags, oppprof.CLIFlags(EnvVarPrefix)...) + + Flags = append(requiredFlags, optionalFlags...) +} + +var Flags []cli.Flag + +func CheckRequired(ctx *cli.Context) error { + for _, f := range requiredFlags { + name := f.Names()[0] + if !ctx.IsSet(name) { + return fmt.Errorf("flag %s is required", name) + } + } + return nil +} diff --git a/op-interop-filter/justfile b/op-interop-filter/justfile new file mode 100644 index 00000000000..f8333843443 --- /dev/null +++ b/op-interop-filter/justfile @@ -0,0 +1,20 @@ +import '../justfiles/go.just' + +# Build ldflags string +_LDFLAGSSTRING := "'" + trim( + "-X main.GitCommit=" + GITCOMMIT + " " + \ + "-X main.GitDate=" + GITDATE + " " + \ + "-X main.Version=" + VERSION + " " + \ + "") + "'" + +BINARY := "./bin/op-interop-filter" + +# Build op-interop-filter binary +op-interop-filter: (go_build BINARY "./cmd" "-ldflags" _LDFLAGSSTRING) + +# Clean build artifacts +clean: + rm -f {{BINARY}} + +# Run tests +test: (go_test "./...") diff --git a/op-interop-filter/metrics/metrics.go b/op-interop-filter/metrics/metrics.go new file mode 100644 index 00000000000..a3d6909efb8 --- /dev/null +++ b/op-interop-filter/metrics/metrics.go @@ -0,0 +1,221 @@ +package metrics + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" +) + +const Namespace = "op_interop_filter" + +type Metricer interface { + RecordInfo(version string) + RecordUp() + RecordFailsafeEnabled(enabled bool) + RecordChainHead(chainID uint64, blockNum uint64) + RecordCheckAccessList(success bool) + RecordReorgDetected(chainID uint64) + RecordChainReady(chainID uint64, ready bool) + RecordBackfillProgress(chainID uint64, current, total uint64) + RecordLogsDBFirstBlock(chainID uint64, blockNum uint64) + RecordLogsDBBlocksSealed(chainID uint64) + RecordLogsDBLogsAdded(chainID uint64, count int) + RecordLogsDBEntries(chainID uint64, count int64) +} + +type Metrics struct { + registry *prometheus.Registry + factory opmetrics.Factory + + info *prometheus.GaugeVec + up prometheus.Gauge + failsafeEnabled prometheus.Gauge + chainHead *prometheus.GaugeVec + checkAccessTotal *prometheus.CounterVec + + // Chain-specific metrics + reorgTotal *prometheus.CounterVec + chainReady *prometheus.GaugeVec + backfillProgress *prometheus.GaugeVec + logsDBFirstBlock *prometheus.GaugeVec + logsDBBlocksTotal *prometheus.CounterVec + logsDBLogsTotal *prometheus.CounterVec + logsDBEntries *prometheus.GaugeVec +} + +var _ Metricer = (*Metrics)(nil) +var _ opmetrics.RegistryMetricer = (*Metrics)(nil) + +func NewMetrics(_ string) *Metrics { + registry := opmetrics.NewRegistry() + factory := opmetrics.With(registry) + + return &Metrics{ + registry: registry, + factory: factory, + + info: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "info", + Help: "Service info", + }, []string{"version"}), + + up: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "up", + Help: "1 if service is up", + }), + + failsafeEnabled: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "failsafe_enabled", + Help: "1 if failsafe is enabled", + }), + + chainHead: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "chain_head", + Help: "Latest ingested block number", + }, []string{"chain_id"}), + + checkAccessTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "check_access_list_total", + Help: "Total checkAccessList requests", + }, []string{"success"}), + + reorgTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "reorg_total", + Help: "Total reorgs detected", + }, []string{"chain_id"}), + + chainReady: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "chain_ready", + Help: "1 if chain has finished backfill and is ready", + }, []string{"chain_id"}), + + backfillProgress: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "backfill_progress", + Help: "Backfill progress ratio (0-1)", + }, []string{"chain_id"}), + + logsDBFirstBlock: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "logsdb_first_block", + Help: "First block number in LogsDB", + }, []string{"chain_id"}), + + logsDBBlocksTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "logsdb_blocks_total", + Help: "Total blocks sealed in LogsDB", + }, []string{"chain_id"}), + + logsDBLogsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: Namespace, + Name: "logsdb_logs_total", + Help: "Total logs added to LogsDB", + }, []string{"chain_id"}), + + logsDBEntries: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Name: "logsdb_entries", + Help: "Current number of entries in LogsDB", + }, []string{"chain_id"}), + } +} + +func (m *Metrics) Registry() *prometheus.Registry { + return m.registry +} + +func (m *Metrics) Document() []opmetrics.DocumentedMetric { + return m.factory.Document() +} + +func (m *Metrics) RecordInfo(version string) { + m.info.WithLabelValues(version).Set(1) +} + +func (m *Metrics) RecordUp() { + m.up.Set(1) +} + +func (m *Metrics) RecordFailsafeEnabled(enabled bool) { + if enabled { + m.failsafeEnabled.Set(1) + } else { + m.failsafeEnabled.Set(0) + } +} + +func (m *Metrics) RecordChainHead(chainID uint64, blockNum uint64) { + m.chainHead.WithLabelValues(strconv.FormatUint(chainID, 10)).Set(float64(blockNum)) +} + +func (m *Metrics) RecordCheckAccessList(success bool) { + label := "false" + if success { + label = "true" + } + m.checkAccessTotal.WithLabelValues(label).Inc() +} + +func (m *Metrics) RecordReorgDetected(chainID uint64) { + m.reorgTotal.WithLabelValues(strconv.FormatUint(chainID, 10)).Inc() +} + +func (m *Metrics) RecordChainReady(chainID uint64, ready bool) { + val := float64(0) + if ready { + val = 1 + } + m.chainReady.WithLabelValues(strconv.FormatUint(chainID, 10)).Set(val) +} + +func (m *Metrics) RecordBackfillProgress(chainID uint64, current, total uint64) { + progress := float64(0) + if total > 0 { + progress = float64(current) / float64(total) + } + m.backfillProgress.WithLabelValues(strconv.FormatUint(chainID, 10)).Set(progress) +} + +func (m *Metrics) RecordLogsDBFirstBlock(chainID uint64, blockNum uint64) { + m.logsDBFirstBlock.WithLabelValues(strconv.FormatUint(chainID, 10)).Set(float64(blockNum)) +} + +func (m *Metrics) RecordLogsDBBlocksSealed(chainID uint64) { + m.logsDBBlocksTotal.WithLabelValues(strconv.FormatUint(chainID, 10)).Inc() +} + +func (m *Metrics) RecordLogsDBLogsAdded(chainID uint64, count int) { + m.logsDBLogsTotal.WithLabelValues(strconv.FormatUint(chainID, 10)).Add(float64(count)) +} + +func (m *Metrics) RecordLogsDBEntries(chainID uint64, count int64) { + m.logsDBEntries.WithLabelValues(strconv.FormatUint(chainID, 10)).Set(float64(count)) +} + +// NoopMetrics is a no-op implementation for testing +var NoopMetrics Metricer = &noopMetrics{} + +type noopMetrics struct{} + +func (n *noopMetrics) RecordInfo(version string) {} +func (n *noopMetrics) RecordUp() {} +func (n *noopMetrics) RecordFailsafeEnabled(enabled bool) {} +func (n *noopMetrics) RecordChainHead(chainID uint64, blockNum uint64) {} +func (n *noopMetrics) RecordCheckAccessList(success bool) {} +func (n *noopMetrics) RecordReorgDetected(chainID uint64) {} +func (n *noopMetrics) RecordChainReady(chainID uint64, ready bool) {} +func (n *noopMetrics) RecordBackfillProgress(chainID uint64, current, total uint64) {} +func (n *noopMetrics) RecordLogsDBFirstBlock(chainID uint64, blockNum uint64) {} +func (n *noopMetrics) RecordLogsDBBlocksSealed(chainID uint64) {} +func (n *noopMetrics) RecordLogsDBLogsAdded(chainID uint64, count int) {} +func (n *noopMetrics) RecordLogsDBEntries(chainID uint64, count int64) {} diff --git a/op-interop-filter/scripts/run-overnight.sh b/op-interop-filter/scripts/run-overnight.sh new file mode 100755 index 00000000000..361b2cfbcac --- /dev/null +++ b/op-interop-filter/scripts/run-overnight.sh @@ -0,0 +1,176 @@ +#!/bin/bash +set -e + +# Run overnight test with full observability (multi-chain) +# +# Usage: +# ./scripts/run-overnight.sh +# +# This script: +# 1. Starts the filter service with metrics (OP Sepolia + Unichain Sepolia) +# 2. Starts spammers for each chain +# 3. Shows how to run the dashboard +# +# To stop: Ctrl+C or kill the background processes + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ROOT_DIR="$(dirname "$SCRIPT_DIR")" +cd "$ROOT_DIR/.." + +# Load dev environment +if [ -f "$ROOT_DIR/.env.dev" ]; then + source "$ROOT_DIR/.env.dev" +fi + +# Chain config (set in .env.dev) +OP_SEPOLIA_CHAIN_ID="11155420" +UNICHAIN_SEPOLIA_CHAIN_ID="1301" + +# Validate required env vars +if [ -z "$OP_SEPOLIA_RPC" ]; then + echo "ERROR: OP_SEPOLIA_RPC not set. Create .env.dev or export it." + exit 1 +fi +if [ -z "$UNICHAIN_SEPOLIA_RPC" ]; then + echo "ERROR: UNICHAIN_SEPOLIA_RPC not set. Create .env.dev or export it." + exit 1 +fi + +# General config +BACKFILL_DURATION="${BACKFILL_DURATION:-5m}" +QUERY_INTERVAL="${QUERY_INTERVAL:-5s}" +NUM_QUERIES="${NUM_QUERIES:-0}" # 0 = run forever +BLOCK_RANGE="${BLOCK_RANGE:-100}" # Should be less than blocks in BACKFILL_DURATION + +# Ports +FILTER_RPC_PORT=8560 +FILTER_METRICS_PORT=7300 +SPAMMER_OP_METRICS_PORT=7301 +SPAMMER_UNI_METRICS_PORT=7302 + +echo "==================================================" +echo " OP-INTEROP-FILTER OVERNIGHT TEST (MULTI-CHAIN)" +echo "==================================================" +echo "" +echo "Chains:" +echo " OP Sepolia (${OP_SEPOLIA_CHAIN_ID}): $OP_SEPOLIA_RPC" +echo " Unichain Sepolia (${UNICHAIN_SEPOLIA_CHAIN_ID}): $UNICHAIN_SEPOLIA_RPC" +echo "" +echo "Configuration:" +echo " Backfill Duration: $BACKFILL_DURATION" +echo " Query Interval: $QUERY_INTERVAL" +echo " Num Queries: $NUM_QUERIES (0=forever)" +echo " Block Range: $BLOCK_RANGE" +echo "" + +# Build binaries +echo "Building binaries..." +go build -o ./bin/op-interop-filter ./op-interop-filter/cmd/main.go +go build -o ./bin/filter-spammer ./op-interop-filter/cmd/spammer/main.go +go build -o ./bin/filter-dashboard ./op-interop-filter/cmd/dashboard/main.go +echo "Build complete." +echo "" + +# Cleanup function +cleanup() { + echo "" + echo "Stopping services..." + kill $FILTER_PID 2>/dev/null || true + kill $SPAMMER_OP_PID 2>/dev/null || true + kill $SPAMMER_UNI_PID 2>/dev/null || true + echo "Done." + exit 0 +} +trap cleanup SIGINT SIGTERM + +# Start filter service with both chains +echo "Starting filter service (multi-chain)..." +./bin/op-interop-filter \ + --l2-rpcs="$OP_SEPOLIA_RPC" \ + --l2-rpcs="$UNICHAIN_SEPOLIA_RPC" \ + --backfill-duration="$BACKFILL_DURATION" \ + --rpc.port=$FILTER_RPC_PORT \ + --rpc.enable-admin \ + --metrics.enabled \ + --metrics.port=$FILTER_METRICS_PORT \ + --log.level=info \ + > /tmp/filter.log 2>&1 & +FILTER_PID=$! +echo " Filter PID: $FILTER_PID" +echo " RPC: http://localhost:$FILTER_RPC_PORT" +echo " Metrics: http://localhost:$FILTER_METRICS_PORT/metrics" +echo " Log: /tmp/filter.log" +echo "" + +# Wait for filter to be ready +echo "Waiting for filter to be ready..." +for i in {1..120}; do + if curl -s http://localhost:$FILTER_RPC_PORT -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","method":"admin_getFailsafeEnabled","params":[],"id":1}' | grep -q "result"; then + echo " Filter is responding!" + break + fi + sleep 2 +done +echo "" + +# Start spammer for OP Sepolia +echo "Starting spammer for OP Sepolia..." +./bin/filter-spammer \ + --l2-rpc="$OP_SEPOLIA_RPC" \ + --filter-rpc="http://localhost:$FILTER_RPC_PORT" \ + --chain-id=$OP_SEPOLIA_CHAIN_ID \ + --num-queries=$NUM_QUERIES \ + --block-range=$BLOCK_RANGE \ + --query-interval="$QUERY_INTERVAL" \ + --metrics.enabled \ + --metrics.port=$SPAMMER_OP_METRICS_PORT \ + --log.level=info \ + > /tmp/spammer-op.log 2>&1 & +SPAMMER_OP_PID=$! +echo " Spammer (OP Sepolia) PID: $SPAMMER_OP_PID" +echo " Metrics: http://localhost:$SPAMMER_OP_METRICS_PORT/metrics" +echo " Log: /tmp/spammer-op.log" +echo "" + +# Start spammer for Unichain Sepolia +echo "Starting spammer for Unichain Sepolia..." +./bin/filter-spammer \ + --l2-rpc="$UNICHAIN_SEPOLIA_RPC" \ + --filter-rpc="http://localhost:$FILTER_RPC_PORT" \ + --chain-id=$UNICHAIN_SEPOLIA_CHAIN_ID \ + --num-queries=$NUM_QUERIES \ + --block-range=$BLOCK_RANGE \ + --query-interval="$QUERY_INTERVAL" \ + --metrics.enabled \ + --metrics.port=$SPAMMER_UNI_METRICS_PORT \ + --log.level=info \ + > /tmp/spammer-uni.log 2>&1 & +SPAMMER_UNI_PID=$! +echo " Spammer (Unichain Sepolia) PID: $SPAMMER_UNI_PID" +echo " Metrics: http://localhost:$SPAMMER_UNI_METRICS_PORT/metrics" +echo " Log: /tmp/spammer-uni.log" +echo "" + +echo "==================================================" +echo " SERVICES RUNNING" +echo "==================================================" +echo "" +echo "To view dashboard, run in another terminal:" +echo " ./bin/filter-dashboard --spammer-metrics=http://localhost:$SPAMMER_OP_METRICS_PORT/metrics,http://localhost:$SPAMMER_UNI_METRICS_PORT/metrics" +echo "" +echo "To view logs:" +echo " tail -f /tmp/filter.log" +echo " tail -f /tmp/spammer-op.log" +echo " tail -f /tmp/spammer-uni.log" +echo "" +echo "To view raw metrics:" +echo " curl http://localhost:$FILTER_METRICS_PORT/metrics" +echo " curl http://localhost:$SPAMMER_OP_METRICS_PORT/metrics" +echo " curl http://localhost:$SPAMMER_UNI_METRICS_PORT/metrics" +echo "" +echo "Press Ctrl+C to stop all services" +echo "" + +# Wait for processes +wait diff --git a/ops/docker/op-stack-go/Dockerfile b/ops/docker/op-stack-go/Dockerfile index c0a77a6b22e..96c4943a5b9 100644 --- a/ops/docker/op-stack-go/Dockerfile +++ b/ops/docker/op-stack-go/Dockerfile @@ -174,6 +174,11 @@ ARG OP_SUPERNODE_VERSION=v0.0.0 RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build cd op-supernode && make op-supernode \ GOOS=$TARGETOS GOARCH=$TARGETARCH GITCOMMIT=$GIT_COMMIT GITDATE=$GIT_DATE VERSION="$OP_SUPERNODE_VERSION" +FROM --platform=$BUILDPLATFORM builder AS op-interop-filter-builder +ARG OP_INTEROP_FILTER_VERSION=v0.0.0 +RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build cd op-interop-filter && make op-interop-filter \ + GOOS=$TARGETOS GOARCH=$TARGETARCH GITCOMMIT=$GIT_COMMIT GITDATE=$GIT_DATE VERSION="$OP_INTEROP_FILTER_VERSION" + FROM --platform=$BUILDPLATFORM builder AS op-test-sequencer-builder ARG OP_TEST_SEQUENCER_VERSION=v0.0.0 RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build cd op-test-sequencer && make op-test-sequencer \ @@ -271,6 +276,10 @@ FROM $TARGET_BASE_IMAGE AS op-supernode-target COPY --from=op-supernode-builder /app/op-supernode/bin/op-supernode /usr/local/bin/ CMD ["op-supernode"] +FROM $TARGET_BASE_IMAGE AS op-interop-filter-target +COPY --from=op-interop-filter-builder /app/op-interop-filter/bin/op-interop-filter /usr/local/bin/ +CMD ["op-interop-filter"] + FROM $TARGET_BASE_IMAGE AS op-test-sequencer-target COPY --from=op-test-sequencer-builder /app/op-test-sequencer/bin/op-test-sequencer /usr/local/bin/ CMD ["op-test-sequencer"] diff --git a/ops/docker/op-stack-go/Dockerfile.dockerignore b/ops/docker/op-stack-go/Dockerfile.dockerignore index 280b1603456..db630bdb08b 100644 --- a/ops/docker/op-stack-go/Dockerfile.dockerignore +++ b/ops/docker/op-stack-go/Dockerfile.dockerignore @@ -20,6 +20,7 @@ !/op-service !/op-supervisor !/op-supernode +!/op-interop-filter !/op-test-sequencer !/op-wheel !/op-alt-da