diff --git a/cmd/integration/commands/commitment.go b/cmd/integration/commands/commitment.go index 3cd3bfa282a..b5db9810b52 100644 --- a/cmd/integration/commands/commitment.go +++ b/cmd/integration/commands/commitment.go @@ -23,12 +23,17 @@ import ( "errors" "fmt" "io" + "math" + "math/rand" "os" "path" "path/filepath" "runtime" + "sort" "strconv" + "strings" "sync" + "time" "github.com/c2h5oh/datasize" "github.com/go-echarts/go-echarts/v2/charts" @@ -69,6 +74,12 @@ var ( visualizeDepth int ) +// bench-lookup command flags +var ( + benchSampleSize int + benchSeed int64 +) + func init() { // commitment branch @@ -108,6 +119,14 @@ func init() { cmdCommitmentVisualize.Flags().IntVar(&visualizeDepth, "depth", 0, "depth of the prefixes to analyze") commitmentCmd.AddCommand(cmdCommitmentVisualize) + // commitment bench-lookup + withChain(cmdCommitmentBenchLookup) + withDataDir(cmdCommitmentBenchLookup) + withConfig(cmdCommitmentBenchLookup) + cmdCommitmentBenchLookup.Flags().IntVar(&benchSampleSize, "sample-size", 10000000, "number of random keys to sample via reservoir sampling") + cmdCommitmentBenchLookup.Flags().Int64Var(&benchSeed, "seed", 0, "random seed for sampling (0 = use current time)") + commitmentCmd.AddCommand(cmdCommitmentBenchLookup) + rootCmd.AddCommand(commitmentCmd) } @@ -349,6 +368,317 @@ Examples: }, } +// integration commitment bench-lookup +var cmdCommitmentBenchLookup = &cobra.Command{ + Use: "bench-lookup", + Short: "Benchmark commitment domain lookup performance", + Long: `Benchmarks trie node lookup times in the commitment domain. +Uses reservoir sampling to select random keys and measures lookup latencies. + +Examples: + integration commitment bench-lookup --chain=mainnet --datadir ~/data/eth-mainnet + integration commitment bench-lookup --datadir /path/to/datadir --sample-size 100000 + integration commitment bench-lookup --datadir /path/to/datadir --seed 12345`, + Run: func(cmd *cobra.Command, args []string) { + logger := debug.SetupCobra(cmd, "integration") + ctx, _ := common.RootContext() + + if err := benchLookup(ctx, logger); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error(err.Error()) + } + return + } + }, +} + +type BenchStats struct { + Count int + TotalTime time.Duration + Min time.Duration + Max time.Duration + Mean time.Duration + Median time.Duration + P50 time.Duration + P90 time.Duration + P95 time.Duration + P99 time.Duration + P999 time.Duration + StdDev time.Duration + Throughput float64 // ops/sec +} + +func benchLookup(ctx context.Context, logger log.Logger) error { + if benchSampleSize <= 0 { + return fmt.Errorf("sample-size must be positive, got %d", benchSampleSize) + } + + seed := benchSeed + if seed == 0 { + seed = time.Now().UnixNano() + } + rng := rand.New(rand.NewSource(seed)) + + dirs := datadir.New(datadirCli) + chainDb, err := openDB(dbCfg(dbcfg.ChainDB, dirs.Chaindata), true, chain, logger) + if err != nil { + return fmt.Errorf("opening DB: %w", err) + } + defer chainDb.Close() + + agg := chainDb.(dbstate.HasAgg).Agg().(*dbstate.Aggregator) + agg.DisableAllDependencies() + + acRo := agg.BeginFilesRo() + defer acRo.Close() + + // Sample keys from commtiment domain using reservoir sampling + logger.Info("Sampling keys from commitment domain files...", "sampleSize", benchSampleSize) + + keys, totalCount, err := sampleCommitmentKeysFromFiles(ctx, acRo, benchSampleSize, rng, logger) + if err != nil { + return fmt.Errorf("failed to sample keys: %w", err) + } + + if len(keys) == 0 { + logger.Warn("No keys found in commitment domain") + return nil + } + + logger.Info("Sampled keys", + "sampledKeys", len(keys), + "totalKeysInFiles", totalCount, + "sampleRate", fmt.Sprintf("%.4f%%", float64(len(keys))/float64(totalCount)*100)) + + // Benchmark lookups + logger.Info("Benchmarking lookups...", "keyCount", len(keys)) + + // Shuffle keys to randomize access pattern + rng.Shuffle(len(keys), func(i, j int) { + keys[i], keys[j] = keys[j], keys[i] + }) + + tx, err := chainDb.BeginTemporalRo(ctx) + if err != nil { + return fmt.Errorf("failed to begin temporal tx: %w", err) + } + defer tx.Rollback() + + commitmentReader := commitmentdb.NewLatestStateReader(tx) + durations := make([]time.Duration, len(keys)) + var totalSize int64 + + startTime := time.Now() + for i, key := range keys { + lookupStart := time.Now() + val, _, err := commitmentReader.Read(kv.CommitmentDomain, key, config3.DefaultStepSize) + durations[i] = time.Since(lookupStart) + + if err != nil { + logger.Warn("Lookup failed", "key", fmt.Sprintf("%x", key), "error", err) + continue + } + totalSize += int64(len(val)) + + // Progress logging + if (i+1)%10000 == 0 { + elapsed := time.Since(startTime) + opsPerSec := float64(i+1) / elapsed.Seconds() + logger.Info("Progress", "completed", i+1, "total", len(keys), "ops/sec", fmt.Sprintf("%.0f", opsPerSec)) + } + } + totalBenchTime := time.Since(startTime) + + // Calculate statistics + stats := calculateBenchStats(durations) + stats.Throughput = float64(len(keys)) / totalBenchTime.Seconds() + + // Print results + printBenchResults("Commitment Domain Lookups", stats, totalSize, len(keys), totalCount) + + return nil +} + +// sampleCommitmentKeysFromFiles samples keys from all commitment domain .kv files using reservoir sampling. +// This iterates over ALL key-value pairs in ALL .kv files +func sampleCommitmentKeysFromFiles(ctx context.Context, acRo *dbstate.AggregatorRoTx, sampleSize int, rng *rand.Rand, logger log.Logger) ([][]byte, int, error) { + keys := make([][]byte, 0, sampleSize) + globalCount := 0 + lastLog := time.Now() + + commitmentFiles := acRo.Files(kv.CommitmentDomain) + + // consider .kv files only + var commitmentKVFiles []dbstate.VisibleFile + for _, f := range commitmentFiles { + if strings.HasSuffix(f.Fullpath(), ".kv") { + commitmentKVFiles = append(commitmentKVFiles, f) + } + } + logger.Info("Found commitment .kv files", "kvFiles", len(commitmentKVFiles), "totalFiles", len(commitmentFiles)) + + for fileIdx, f := range commitmentKVFiles { + fpath := f.Fullpath() + logger.Info("Scanning file...", "file", filepath.Base(fpath), "fileIdx", fileIdx+1, "totalFiles", len(commitmentKVFiles)) + + dec, err := seg.NewDecompressor(fpath) + if err != nil { + return nil, 0, fmt.Errorf("failed to create decompressor for %s: %w", fpath, err) + } + defer dec.Close() + + fc := statecfg.Schema.GetDomainCfg(kv.CommitmentDomain).Compression + getter := seg.NewReader(dec.MakeGetter(), fc) + + fileKeyCount := 0 + for getter.HasNext() { + key, _ := getter.Next(nil) + if !getter.HasNext() { + return nil, 0, fmt.Errorf("invalid key/value pair in %s", fpath) + } + getter.Skip() // skip value + + // Skip the "state" key + if bytes.Equal(key, []byte("state")) { + continue + } + + globalCount++ + fileKeyCount++ + + if len(keys) < sampleSize { + // Reservoir not full yet - always add + keyCopy := make([]byte, len(key)) + copy(keyCopy, key) + keys = append(keys, keyCopy) + } else { + // Reservoir full - replace with probability sampleSize/globalCount + j := rng.Intn(globalCount) + if j < sampleSize { + if len(keys[j]) != len(key) { + keys[j] = make([]byte, len(key)) + } + copy(keys[j], key) + } + } + + if time.Since(lastLog) > 10*time.Second { + logger.Info("Sampling...", + "file", filepath.Base(fpath), + "fileKeys", fileKeyCount, + "globalScanned", globalCount, + "reservoir", len(keys)) + lastLog = time.Now() + } + + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + default: + } + } + logger.Info("File complete", "file", filepath.Base(fpath), "keysInFile", fileKeyCount) + } + + return keys, globalCount, nil +} + +func calculateBenchStats(durations []time.Duration) BenchStats { + if len(durations) == 0 { + return BenchStats{} + } + + // Sort for percentile calculations + sorted := make([]time.Duration, len(durations)) + copy(sorted, durations) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i] < sorted[j] + }) + + // Calculate basic stats + var total time.Duration + minD := sorted[0] + maxD := sorted[len(sorted)-1] + + for _, d := range sorted { + total += d + } + + mean := total / time.Duration(len(sorted)) + + // Calculate standard deviation + var sumSquaredDiff float64 + meanFloat := float64(mean) + for _, d := range sorted { + diff := float64(d) - meanFloat + sumSquaredDiff += diff * diff + } + variance := sumSquaredDiff / float64(len(sorted)) + stdDev := time.Duration(math.Sqrt(variance)) + + return BenchStats{ + Count: len(durations), + TotalTime: total, + Min: minD, + Max: maxD, + Mean: mean, + Median: benchPercentile(sorted, 0.50), + P50: benchPercentile(sorted, 0.50), + P90: benchPercentile(sorted, 0.90), + P95: benchPercentile(sorted, 0.95), + P99: benchPercentile(sorted, 0.99), + P999: benchPercentile(sorted, 0.999), + StdDev: stdDev, + } +} + +func benchPercentile(sorted []time.Duration, p float64) time.Duration { + if len(sorted) == 0 { + return 0 + } + idx := int(float64(len(sorted)-1) * p) + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] +} + +func printBenchResults(name string, stats BenchStats, totalSize int64, keyCount int, totalKeysInFiles int) { + avgValueSize := float64(totalSize) / float64(keyCount) + + fmt.Println() + fmt.Println("================================================================================") + fmt.Printf(" BENCHMARK RESULTS: %s\n", name) + fmt.Println("================================================================================") + fmt.Println() + fmt.Printf(" Total Keys in Files: %d\n", totalKeysInFiles) + fmt.Printf(" Sampled Keys: %d\n", keyCount) + fmt.Printf(" Sample Rate: %.4f%%\n", float64(keyCount)/float64(totalKeysInFiles)*100) + fmt.Println() + fmt.Printf(" Total Lookups: %d\n", stats.Count) + fmt.Printf(" Total Time: %v\n", stats.TotalTime) + fmt.Printf(" Throughput: %.2f ops/sec\n", stats.Throughput) + fmt.Printf(" Avg Value Size: %.2f bytes\n", avgValueSize) + fmt.Println() + fmt.Println(" Latency Statistics:") + fmt.Println(" -------------------") + fmt.Printf(" Min: %v\n", stats.Min) + fmt.Printf(" Max: %v\n", stats.Max) + fmt.Printf(" Mean: %v\n", stats.Mean) + fmt.Printf(" Std Dev: %v\n", stats.StdDev) + fmt.Println() + fmt.Println(" Percentiles:") + fmt.Println(" ------------") + fmt.Printf(" P50 (Median): %v\n", stats.P50) + fmt.Printf(" P90: %v\n", stats.P90) + fmt.Printf(" P95: %v\n", stats.P95) + fmt.Printf(" P99: %v\n", stats.P99) + fmt.Printf(" P99.9: %v\n", stats.P999) + fmt.Println() + fmt.Println("================================================================================") + fmt.Println() +} + func visualizeCommitmentFiles(files []string) { sema := make(chan struct{}, visualizeConcurrency) for i := 0; i < cap(sema); i++ {