Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 330 additions & 0 deletions cmd/integration/commands/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/c2h5oh/datasize"
"github.com/go-echarts/go-echarts/v2/charts"
Expand Down Expand Up @@ -69,6 +74,12 @@ var (
visualizeDepth int
)

// bench-lookup command flags
var (
benchSampleSize int
benchSeed int64
)

func init() {

// commitment branch
Expand Down Expand Up @@ -108,6 +119,14 @@ func init() {
cmdCommitmentVisualize.Flags().IntVar(&visualizeDepth, "depth", 0, "depth of the prefixes to analyze")
commitmentCmd.AddCommand(cmdCommitmentVisualize)

// commitment bench-lookup
withChain(cmdCommitmentBenchLookup)
withDataDir(cmdCommitmentBenchLookup)
withConfig(cmdCommitmentBenchLookup)
cmdCommitmentBenchLookup.Flags().IntVar(&benchSampleSize, "sample-size", 10000000, "number of random keys to sample via reservoir sampling")
cmdCommitmentBenchLookup.Flags().Int64Var(&benchSeed, "seed", 0, "random seed for sampling (0 = use current time)")
commitmentCmd.AddCommand(cmdCommitmentBenchLookup)

rootCmd.AddCommand(commitmentCmd)

}
Expand Down Expand Up @@ -349,6 +368,317 @@ Examples:
},
}

// integration commitment bench-lookup
var cmdCommitmentBenchLookup = &cobra.Command{
Use: "bench-lookup",
Short: "Benchmark commitment domain lookup performance",
Long: `Benchmarks trie node lookup times in the commitment domain.
Uses reservoir sampling to select random keys and measures lookup latencies.

Examples:
integration commitment bench-lookup --chain=mainnet --datadir ~/data/eth-mainnet
integration commitment bench-lookup --datadir /path/to/datadir --sample-size 100000
integration commitment bench-lookup --datadir /path/to/datadir --seed 12345`,
Run: func(cmd *cobra.Command, args []string) {
logger := debug.SetupCobra(cmd, "integration")
ctx, _ := common.RootContext()

if err := benchLookup(ctx, logger); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error(err.Error())
}
return
}
},
}

type BenchStats struct {
Count int
TotalTime time.Duration
Min time.Duration
Max time.Duration
Mean time.Duration
Median time.Duration
P50 time.Duration
P90 time.Duration
P95 time.Duration
P99 time.Duration
P999 time.Duration
StdDev time.Duration
Throughput float64 // ops/sec
}

func benchLookup(ctx context.Context, logger log.Logger) error {
if benchSampleSize <= 0 {
return fmt.Errorf("sample-size must be positive, got %d", benchSampleSize)
}

seed := benchSeed
if seed == 0 {
seed = time.Now().UnixNano()
}
rng := rand.New(rand.NewSource(seed))

dirs := datadir.New(datadirCli)
chainDb, err := openDB(dbCfg(dbcfg.ChainDB, dirs.Chaindata), true, chain, logger)
if err != nil {
return fmt.Errorf("opening DB: %w", err)
}
defer chainDb.Close()

agg := chainDb.(dbstate.HasAgg).Agg().(*dbstate.Aggregator)
agg.DisableAllDependencies()

acRo := agg.BeginFilesRo()
defer acRo.Close()

// Sample keys from commtiment domain using reservoir sampling
logger.Info("Sampling keys from commitment domain files...", "sampleSize", benchSampleSize)

keys, totalCount, err := sampleCommitmentKeysFromFiles(ctx, acRo, benchSampleSize, rng, logger)
if err != nil {
return fmt.Errorf("failed to sample keys: %w", err)
}

if len(keys) == 0 {
logger.Warn("No keys found in commitment domain")
return nil
}

logger.Info("Sampled keys",
"sampledKeys", len(keys),
"totalKeysInFiles", totalCount,
"sampleRate", fmt.Sprintf("%.4f%%", float64(len(keys))/float64(totalCount)*100))

// Benchmark lookups
logger.Info("Benchmarking lookups...", "keyCount", len(keys))

// Shuffle keys to randomize access pattern
rng.Shuffle(len(keys), func(i, j int) {
keys[i], keys[j] = keys[j], keys[i]
})

tx, err := chainDb.BeginTemporalRo(ctx)
if err != nil {
return fmt.Errorf("failed to begin temporal tx: %w", err)
}
defer tx.Rollback()

commitmentReader := commitmentdb.NewLatestStateReader(tx)
durations := make([]time.Duration, len(keys))
var totalSize int64

startTime := time.Now()
for i, key := range keys {
lookupStart := time.Now()
val, _, err := commitmentReader.Read(kv.CommitmentDomain, key, config3.DefaultStepSize)
durations[i] = time.Since(lookupStart)

if err != nil {
logger.Warn("Lookup failed", "key", fmt.Sprintf("%x", key), "error", err)
continue
}
totalSize += int64(len(val))

// Progress logging
if (i+1)%10000 == 0 {
elapsed := time.Since(startTime)
opsPerSec := float64(i+1) / elapsed.Seconds()
logger.Info("Progress", "completed", i+1, "total", len(keys), "ops/sec", fmt.Sprintf("%.0f", opsPerSec))
}
}
totalBenchTime := time.Since(startTime)

// Calculate statistics
stats := calculateBenchStats(durations)
stats.Throughput = float64(len(keys)) / totalBenchTime.Seconds()

// Print results
printBenchResults("Commitment Domain Lookups", stats, totalSize, len(keys), totalCount)

return nil
}

// sampleCommitmentKeysFromFiles samples keys from all commitment domain .kv files using reservoir sampling.
// This iterates over ALL key-value pairs in ALL .kv files
func sampleCommitmentKeysFromFiles(ctx context.Context, acRo *dbstate.AggregatorRoTx, sampleSize int, rng *rand.Rand, logger log.Logger) ([][]byte, int, error) {
keys := make([][]byte, 0, sampleSize)
globalCount := 0
lastLog := time.Now()

commitmentFiles := acRo.Files(kv.CommitmentDomain)

// consider .kv files only
var commitmentKVFiles []dbstate.VisibleFile
for _, f := range commitmentFiles {
if strings.HasSuffix(f.Fullpath(), ".kv") {
commitmentKVFiles = append(commitmentKVFiles, f)
}
}
logger.Info("Found commitment .kv files", "kvFiles", len(commitmentKVFiles), "totalFiles", len(commitmentFiles))

for fileIdx, f := range commitmentKVFiles {
fpath := f.Fullpath()
logger.Info("Scanning file...", "file", filepath.Base(fpath), "fileIdx", fileIdx+1, "totalFiles", len(commitmentKVFiles))

dec, err := seg.NewDecompressor(fpath)
if err != nil {
return nil, 0, fmt.Errorf("failed to create decompressor for %s: %w", fpath, err)
}
defer dec.Close()

fc := statecfg.Schema.GetDomainCfg(kv.CommitmentDomain).Compression
getter := seg.NewReader(dec.MakeGetter(), fc)

fileKeyCount := 0
for getter.HasNext() {
key, _ := getter.Next(nil)
if !getter.HasNext() {
return nil, 0, fmt.Errorf("invalid key/value pair in %s", fpath)
}
getter.Skip() // skip value

// Skip the "state" key
if bytes.Equal(key, []byte("state")) {
continue
}

globalCount++
fileKeyCount++

if len(keys) < sampleSize {
// Reservoir not full yet - always add
keyCopy := make([]byte, len(key))
copy(keyCopy, key)
keys = append(keys, keyCopy)
} else {
// Reservoir full - replace with probability sampleSize/globalCount
j := rng.Intn(globalCount)
if j < sampleSize {
if len(keys[j]) != len(key) {
keys[j] = make([]byte, len(key))
}
copy(keys[j], key)
}
}

if time.Since(lastLog) > 10*time.Second {
logger.Info("Sampling...",
"file", filepath.Base(fpath),
"fileKeys", fileKeyCount,
"globalScanned", globalCount,
"reservoir", len(keys))
lastLog = time.Now()
}

select {
case <-ctx.Done():
return nil, 0, ctx.Err()
default:
}
}
logger.Info("File complete", "file", filepath.Base(fpath), "keysInFile", fileKeyCount)
}

return keys, globalCount, nil
}

func calculateBenchStats(durations []time.Duration) BenchStats {
if len(durations) == 0 {
return BenchStats{}
}

// Sort for percentile calculations
sorted := make([]time.Duration, len(durations))
copy(sorted, durations)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] < sorted[j]
})

// Calculate basic stats
var total time.Duration
minD := sorted[0]
maxD := sorted[len(sorted)-1]

for _, d := range sorted {
total += d
}

mean := total / time.Duration(len(sorted))

// Calculate standard deviation
var sumSquaredDiff float64
meanFloat := float64(mean)
for _, d := range sorted {
diff := float64(d) - meanFloat
sumSquaredDiff += diff * diff
}
variance := sumSquaredDiff / float64(len(sorted))
stdDev := time.Duration(math.Sqrt(variance))

return BenchStats{
Count: len(durations),
TotalTime: total,
Min: minD,
Max: maxD,
Mean: mean,
Median: benchPercentile(sorted, 0.50),
P50: benchPercentile(sorted, 0.50),
P90: benchPercentile(sorted, 0.90),
P95: benchPercentile(sorted, 0.95),
P99: benchPercentile(sorted, 0.99),
P999: benchPercentile(sorted, 0.999),
StdDev: stdDev,
}
}

func benchPercentile(sorted []time.Duration, p float64) time.Duration {
if len(sorted) == 0 {
return 0
}
idx := int(float64(len(sorted)-1) * p)
if idx >= len(sorted) {
idx = len(sorted) - 1
}
return sorted[idx]
}

func printBenchResults(name string, stats BenchStats, totalSize int64, keyCount int, totalKeysInFiles int) {
avgValueSize := float64(totalSize) / float64(keyCount)

fmt.Println()
fmt.Println("================================================================================")
fmt.Printf(" BENCHMARK RESULTS: %s\n", name)
fmt.Println("================================================================================")
fmt.Println()
fmt.Printf(" Total Keys in Files: %d\n", totalKeysInFiles)
fmt.Printf(" Sampled Keys: %d\n", keyCount)
fmt.Printf(" Sample Rate: %.4f%%\n", float64(keyCount)/float64(totalKeysInFiles)*100)
fmt.Println()
fmt.Printf(" Total Lookups: %d\n", stats.Count)
fmt.Printf(" Total Time: %v\n", stats.TotalTime)
fmt.Printf(" Throughput: %.2f ops/sec\n", stats.Throughput)
fmt.Printf(" Avg Value Size: %.2f bytes\n", avgValueSize)
fmt.Println()
fmt.Println(" Latency Statistics:")
fmt.Println(" -------------------")
fmt.Printf(" Min: %v\n", stats.Min)
fmt.Printf(" Max: %v\n", stats.Max)
fmt.Printf(" Mean: %v\n", stats.Mean)
fmt.Printf(" Std Dev: %v\n", stats.StdDev)
fmt.Println()
fmt.Println(" Percentiles:")
fmt.Println(" ------------")
fmt.Printf(" P50 (Median): %v\n", stats.P50)
fmt.Printf(" P90: %v\n", stats.P90)
fmt.Printf(" P95: %v\n", stats.P95)
fmt.Printf(" P99: %v\n", stats.P99)
fmt.Printf(" P99.9: %v\n", stats.P999)
fmt.Println()
fmt.Println("================================================================================")
fmt.Println()
}

func visualizeCommitmentFiles(files []string) {
sema := make(chan struct{}, visualizeConcurrency)
for i := 0; i < cap(sema); i++ {
Expand Down
Loading