diff --git a/core/blockchain.go b/core/blockchain.go index c161f0e577..09c5722608 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -187,6 +187,10 @@ type BlockChainConfig struct { StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top ArchiveMode bool // Whether to enable the archive mode + // Address-specific cache sizes for biased caching (pathdb only) + // Maps account address to cache size in bytes + AddressCacheSizes map[common.Address]int + // State snapshot related options SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory SnapshotNoBuild bool // Whether the background generation is allowed @@ -282,8 +286,9 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config { // TODO(rjl493456442): The write buffer represents the memory limit used // for flushing both trie data and state data to disk. The config name // should be updated to eliminate the confusion. - WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024, - NoAsyncFlush: cfg.TrieNoAsyncFlush, + WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024, + NoAsyncFlush: cfg.TrieNoAsyncFlush, + AddressCacheSizes: cfg.AddressCacheSizes, } } return config diff --git a/eth/backend.go b/eth/backend.go index 24cdd8076c..c41da0e406 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -256,18 +256,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } } options := &core.BlockChainConfig{ - TrieCleanLimit: config.TrieCleanCache, - NoPrefetch: config.NoPrefetch, - TrieDirtyLimit: config.TrieDirtyCache, - ArchiveMode: config.NoPruning, - TrieTimeLimit: config.TrieTimeout, - SnapshotLimit: config.SnapshotCache, - Preimages: config.Preimages, - StateHistory: config.StateHistory, - StateScheme: scheme, - TriesInMemory: config.TriesInMemory, - ChainHistoryMode: config.HistoryMode, - TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)), + TrieCleanLimit: config.TrieCleanCache, + NoPrefetch: config.NoPrefetch, + TrieDirtyLimit: config.TrieDirtyCache, + ArchiveMode: config.NoPruning, + TrieTimeLimit: config.TrieTimeout, + SnapshotLimit: config.SnapshotCache, + Preimages: config.Preimages, + StateHistory: config.StateHistory, + StateScheme: scheme, + TriesInMemory: config.TriesInMemory, + ChainHistoryMode: config.HistoryMode, + TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)), + AddressCacheSizes: config.AddressCacheSizes, VmConfig: vm.Config{ EnablePreimageRecording: config.EnablePreimageRecording, }, diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 17ddf77187..3afdd9c6c6 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -148,6 +148,10 @@ type Config struct { // This is the number of blocks for which logs will be cached in the filter system. FilterLogCacheSize int + // Address-specific cache sizes for biased caching (pathdb only) + // Maps account address to cache size in bytes + AddressCacheSizes map[common.Address]int + // Mining options Miner miner.Config diff --git a/internal/cli/server/config.go b/internal/cli/server/config.go index 6e1ecdb88a..fd51f12cd0 100644 --- a/internal/cli/server/config.go +++ b/internal/cli/server/config.go @@ -621,6 +621,12 @@ type CacheConfig struct { // Raise the open file descriptor resource limit (default = system fd limit) FDLimit int `hcl:"fdlimit,optional" toml:"fdlimit,optional"` + // Address-specific cache sizes for biased caching (format: "address=sizeMB,address=sizeMB") + // Size is specified in MB (megabytes) + // Example: "0x1234...=1024,0x5678...=512" (1024MB and 512MB) + AddressCacheSizesRaw string `hcl:"addresscachesizes,optional" toml:"addresscachesizes,optional"` + AddressCacheSizes map[string]string `hcl:"-,optional" toml:"-"` + // GC settings // GoMemLimit sets the soft memory limit for the runtime GoMemLimit string `hcl:"gomemlimit,optional" toml:"gomemlimit,optional"` @@ -1280,6 +1286,16 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* n.TrieTimeout = c.Cache.TrieTimeout n.TriesInMemory = c.Cache.TriesInMemory n.FilterLogCacheSize = c.Cache.FilterLogCacheSize + + // Parse address-specific cache sizes + if c.Cache.AddressCacheSizesRaw != "" { + addressCacheSizes, err := parseAddressCacheSizes(c.Cache.AddressCacheSizesRaw) + if err != nil { + log.Warn("Failed to parse address cache sizes", "error", err) + } else { + n.AddressCacheSizes = addressCacheSizes + } + } } // History @@ -1392,6 +1408,51 @@ func (c *Config) buildEth(stack *node.Node, accountManager *accounts.Manager) (* return &n, nil } +// parseAddressCacheSizes parses address cache sizes from a string format +// Expected format: "address1=sizeMB1,address2=sizeMB2,..." +// Sizes are specified in MB (megabytes) and converted to bytes +// Example: "0x1234...=1024,0x5678...=512" means 1024MB and 512MB +func parseAddressCacheSizes(input string) (map[common.Address]int, error) { + result := make(map[common.Address]int) + if input == "" { + return result, nil + } + + // Split by comma + pairs := strings.Split(input, ",") + for _, pair := range pairs { + pair = strings.TrimSpace(pair) + if pair == "" { + continue + } + + // Split by equals + parts := strings.SplitN(pair, "=", 2) + if len(parts) != 2 { + return nil, fmt.Errorf("invalid format for address cache size pair: %s", pair) + } + + // Parse address + addressStr := strings.TrimSpace(parts[0]) + if !strings.HasPrefix(addressStr, "0x") { + addressStr = "0x" + addressStr + } + address := common.HexToAddress(addressStr) + + // Parse size in MB and convert to bytes + sizeMB, err := strconv.Atoi(strings.TrimSpace(parts[1])) + if err != nil { + return nil, fmt.Errorf("invalid size for address %s: %v (must be integer MB)", addressStr, err) + } + + // Convert MB to bytes + sizeBytes := sizeMB * 1024 * 1024 + result[address] = sizeBytes + } + + return result, nil +} + var ( clientIdentifier = "bor" gitCommit = "" // Git SHA1 commit hash of the release (set via linker flags) diff --git a/internal/cli/server/flags.go b/internal/cli/server/flags.go index 35c75d909e..49ce292f40 100644 --- a/internal/cli/server/flags.go +++ b/internal/cli/server/flags.go @@ -468,6 +468,13 @@ func (c *Command) Flags(config *Config) *flagset.Flagset { Default: c.cliConfig.Cache.Preimages, Group: "Cache", }) + f.StringFlag(&flagset.StringFlag{ + Name: "cache.addresscachesizes", + Usage: "Address-specific cache sizes for biased caching in MB (format: address=sizeMB,address=sizeMB, e.g. 0x1234...=1024,0x5678...=512)", + Value: &c.cliConfig.Cache.AddressCacheSizesRaw, + Default: c.cliConfig.Cache.AddressCacheSizesRaw, + Group: "Cache", + }) f.Uint64Flag(&flagset.Uint64Flag{ Name: "cache.triesinmemory", Usage: "Number of block states (tries) to keep in memory", diff --git a/triedb/pathdb/biased_fastcache.go b/triedb/pathdb/biased_fastcache.go new file mode 100644 index 0000000000..405ad3a2b6 --- /dev/null +++ b/triedb/pathdb/biased_fastcache.go @@ -0,0 +1,331 @@ +package pathdb + +import ( + stdcontext "context" + "fmt" + "sync" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + // Biased cache metrics for address-specific cache effectiveness + biasedAddressCacheHitMeter = metrics.NewRegisteredMeter("pathdb/biased/address/hit", nil) + biasedAddressCacheMissMeter = metrics.NewRegisteredMeter("pathdb/biased/address/miss", nil) + biasedAddressCacheReadMeter = metrics.NewRegisteredMeter("pathdb/biased/address/read", nil) + biasedAddressCacheWriteMeter = metrics.NewRegisteredMeter("pathdb/biased/address/write", nil) +) + +// AddressBiasedCache is a wrapper around fastcache that maintains separate +// caches for specific addresses and a common cache for everything else. +// It preloads storage trie nodes for specified addresses into dedicated caches. +type AddressBiasedCache struct { + // Address-specific caches, one per preloaded address + addressCaches sync.Map // map[common.Hash]*fastcache.Cache + + // Common cache for all other data + commonCache *fastcache.Cache + + // Set of preloaded addresses for fast lookup + preloadedAddrs sync.Map // map[common.Hash]struct{} + + // RW mutex to protect cache operations and prevent race conditions + // between async preloading and concurrent reads/writes + mu sync.RWMutex + + // Context for canceling preload operations + ctx stdcontext.Context + cancel stdcontext.CancelFunc + wg sync.WaitGroup // Wait for all preloads to finish +} + +// NewAddressBiasedCache creates a new address-biased cache with preloading. +// It scans the database for storage trie nodes of the specified addresses and +// loads them into dedicated caches. The addressCacheSizes maps each address to +// its desired cache size in bytes. The commonCacheSize specifies the size +// of the cache for non-preloaded data. +// Preloading happens asynchronously in the background. +func NewAddressBiasedCache(db ethdb.Database, addressCacheSizes map[common.Address]int, commonCacheSize int) (*AddressBiasedCache, error) { + ctx, cancel := stdcontext.WithCancel(stdcontext.Background()) + cache := &AddressBiasedCache{ + commonCache: fastcache.New(commonCacheSize), + ctx: ctx, + cancel: cancel, + } + + // Initialize caches synchronously, but preload asynchronously + for addr, cacheSize := range addressCacheSizes { + cache.initAddressCache(addr, cacheSize) + + // Start async preloading + cache.wg.Add(1) + go cache.preloadAddressAsync(db, addr, cacheSize) + } + + return cache, nil +} + +// initAddressCache initializes the cache structures for an address synchronously +func (c *AddressBiasedCache) initAddressCache(addr common.Address, cacheSize int) { + accountHash := crypto.Keccak256Hash(addr.Bytes()) + addrCache := fastcache.New(cacheSize) + + // Mark this address as preloaded + c.preloadedAddrs.Store(accountHash, struct{}{}) + c.addressCaches.Store(accountHash, addrCache) +} + +// preloadAddressAsync loads storage trie nodes for the given account hash using +// BFS traversal, prioritizing shallow nodes (most frequently accessed) until +// the cache is full. This naturally loads nodes by depth, filling the cache +// with as many upper-level nodes as possible. This function runs asynchronously. +func (c *AddressBiasedCache) preloadAddressAsync(db ethdb.Database, addr common.Address, cacheSize int) { + defer c.wg.Done() + startTime := time.Now() + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + + // Get the address cache + cacheValue, ok := c.addressCaches.Load(accountHash) + if !ok { + log.Error("Address cache not found during preload", "address", addr.Hex()) + return + } + addrCache := cacheValue.(*fastcache.Cache) + + // Local stats for logging progress + var entriesLoaded int + var totalBytesLoaded uint64 + + log.Info("Starting storage trie preload", + "address", addr.Hex(), + "account hash", accountHash.Hex(), + "cache size", common.StorageSize(cacheSize).String()) + + var maxDepthReached int + const logInterval = 100000 + + // BFS traversal to load nodes by depth until cache is full + type queueItem struct { + path []byte + depth int + } + queue := []queueItem{{path: nil, depth: 0}} // Start from root + visited := make(map[string]struct{}) // Prevent revisiting nodes + + for len(queue) > 0 { + // Check for shutdown signal periodically + select { + case <-c.ctx.Done(): + log.Info("Preload interrupted by shutdown", + "account hash", accountHash.Hex(), + "entries", entriesLoaded, + "max depth", maxDepthReached, + "size", common.StorageSize(totalBytesLoaded).String(), + "elapsed", time.Since(startTime)) + return + default: + } + + item := queue[0] + queue = queue[1:] + + // Track maximum depth reached + if item.depth > maxDepthReached { + maxDepthReached = item.depth + } + + // Skip if already visited + pathKey := string(item.path) + if _, ok := visited[pathKey]; ok { + continue + } + visited[pathKey] = struct{}{} + + // Read the node from database + nodeData := rawdb.ReadStorageTrieNode(db, accountHash, item.path) + if len(nodeData) == 0 { + // Node doesn't exist, skip + continue + } + + // Check if adding this node would exceed cache size + // Key format: owner (32 bytes) + path + nodeSize := uint64(common.HashLength + len(item.path) + len(nodeData)) + + // Preload 66.6% of the cache size to allow hot paths to be added later + if totalBytesLoaded+nodeSize > uint64(cacheSize*2/3) { + log.Info("Cache size limit reached, stopping preload", + "account hash", accountHash.Hex(), + "entries", entriesLoaded, + "current depth", item.depth, + "max depth reached", maxDepthReached, + "size", common.StorageSize(totalBytesLoaded).String()) + break + } + + // Construct the cache key using the same format as nodeCacheKey + // Format: owner (32 bytes) + path + key := append(accountHash.Bytes(), item.path...) + + // Atomically check-and-set with mutex protection to prevent race conditions. + // We must hold the lock across both the check and the set to guarantee that + // no concurrent write from the main execution path can occur between them. + c.mu.Lock() + if addrCache.Has(key) { + // Key already exists, skip to avoid overwriting potentially newer data + c.mu.Unlock() + continue + } + + // Store in cache while holding the lock + addrCache.Set(key, nodeData) + + // Update counters while still holding the lock to prevent races + entriesLoaded++ + totalBytesLoaded += nodeSize + + c.mu.Unlock() + + // Log progress periodically + if entriesLoaded%logInterval == 0 { + log.Info("Preloading storage trie progress", + "account hash", accountHash.Hex(), + "entries", entriesLoaded, + "current depth", item.depth, + "max depth", maxDepthReached, + "size", common.StorageSize(totalBytesLoaded).String(), + "cache usage", fmt.Sprintf("%.1f%%", float64(totalBytesLoaded)*100/float64(cacheSize)), + "elapsed", time.Since(startTime)) + } + + // Add child nodes to queue for next level + childPaths := c.gatherChildPaths(nodeData, item.path) + for _, childPath := range childPaths { + queue = append(queue, queueItem{ + path: childPath, + depth: item.depth + 1, + }) + } + } + + // Log the completion + loadTime := time.Since(startTime) + log.Info("Completed storage trie preload", + "account hash", accountHash.Hex(), + "entries", entriesLoaded, + "max depth", maxDepthReached, + "size", common.StorageSize(totalBytesLoaded).String(), + "cache usage", fmt.Sprintf("%.1f%%", float64(totalBytesLoaded)*100/float64(cacheSize)), + "time", loadTime) +} + +// gatherChildPaths uses ForGatherChildren to extract child node paths from a trie node. +// It decodes the node and collects paths for all child nodes that need to be loaded. +func (c *AddressBiasedCache) gatherChildPaths(nodeData []byte, currentPath []byte) [][]byte { + var childPaths [][]byte + for i := byte(0); i < 16; i++ { + childPath := append(append([]byte(nil), currentPath...), i) + childPaths = append(childPaths, childPath) + } + + return childPaths +} + +// routeCache determines which cache should be used for the given key. +// Returns the appropriate cache and true if it's an address-specific cache, +// or the common cache and false otherwise. +// +// Note: The key format used by nodeCacheKey is: +// - For account trie: path only +// - For storage trie: owner (32 bytes) + path +func (c *AddressBiasedCache) routeCache(key []byte) (*fastcache.Cache, bool) { + if len(key) >= common.HashLength { + accountHash := common.BytesToHash(key[:common.HashLength]) + if cache, ok := c.addressCaches.Load(accountHash); ok { + return cache.(*fastcache.Cache), true + } + } + + return c.commonCache, false +} + +// Get retrieves the value for the given key from the appropriate cache +func (c *AddressBiasedCache) Get(key []byte) []byte { + c.mu.RLock() + defer c.mu.RUnlock() + + cache, isAddressCache := c.routeCache(key) + value := cache.Get(nil, key) + + if isAddressCache { + if len(value) > 0 { + biasedAddressCacheHitMeter.Mark(1) + biasedAddressCacheReadMeter.Mark(int64(len(value))) + } else { + biasedAddressCacheMissMeter.Mark(1) + } + } + + return value +} + +// Set stores the key-value pair in the appropriate cache +func (c *AddressBiasedCache) Set(key, value []byte) { + c.mu.Lock() + defer c.mu.Unlock() + + cache, isAddressCache := c.routeCache(key) + cache.Set(key, value) + + if isAddressCache { + biasedAddressCacheWriteMeter.Mark(int64(len(value))) + } +} + +// Has checks if the key exists in the appropriate cache +func (c *AddressBiasedCache) Has(key []byte) bool { + c.mu.RLock() + defer c.mu.RUnlock() + + cache, _ := c.routeCache(key) + return cache.Has(key) +} + +// Del removes the key from the appropriate cache +func (c *AddressBiasedCache) Del(key []byte) { + c.mu.Lock() + defer c.mu.Unlock() + + cache, _ := c.routeCache(key) + cache.Del(key) +} + +// Reset resets all caches +func (c *AddressBiasedCache) Reset() { + c.mu.Lock() + defer c.mu.Unlock() + + c.commonCache.Reset() + c.addressCaches.Range(func(key, value any) bool { + cache := value.(*fastcache.Cache) + cache.Reset() + return true + }) +} + +// Close cancels all background preload operations and waits for them to finish. +// This ensures graceful shutdown and prevents goroutines from blocking application termination. +func (c *AddressBiasedCache) Close() { + if c.cancel != nil { + c.cancel() // Signal all goroutines to stop + c.wg.Wait() // Wait for them to finish + } +} diff --git a/triedb/pathdb/biased_fastcache_test.go b/triedb/pathdb/biased_fastcache_test.go new file mode 100644 index 0000000000..7570fc747f --- /dev/null +++ b/triedb/pathdb/biased_fastcache_test.go @@ -0,0 +1,565 @@ +package pathdb + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/VictoriaMetrics/fastcache" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/crypto" +) + +func TestAddressBiasedCache_RouteCache(t *testing.T) { + addr1 := common.HexToAddress("0x1234567890123456789012345678901234567890") + addr2 := common.HexToAddress("0xabcdefabcdefabcdefabcdefabcdefabcdefabcd") + + addressCacheSizes := map[common.Address]int{ + addr1: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Test routing for preloaded address + accountHash1 := crypto.Keccak256Hash(addr1.Bytes()) + key1 := accountHash1.Bytes() + + targetCache, isAddressCache := cache.routeCache(key1) + if !isAddressCache { + t.Error("Expected address-specific cache for preloaded address") + } + expectedCache, _ := cache.addressCaches.Load(accountHash1) + if targetCache != expectedCache.(*fastcache.Cache) { + t.Error("Incorrect cache returned for preloaded address") + } + + // Test routing for non-preloaded address + accountHash2 := crypto.Keccak256Hash(addr2.Bytes()) + key2 := accountHash2.Bytes() + + targetCache, isAddressCache = cache.routeCache(key2) + if isAddressCache { + t.Error("Expected common cache for non-preloaded address") + } + if targetCache != cache.commonCache { + t.Error("Incorrect cache returned for non-preloaded address") + } + + // Test routing for short key (account trie) + shortKey := []byte{0x01, 0x02} + targetCache, isAddressCache = cache.routeCache(shortKey) + if isAddressCache { + t.Error("Expected common cache for short key") + } + if targetCache != cache.commonCache { + t.Error("Incorrect cache returned for short key") + } +} + +func TestAddressBiasedCache_GetSet(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + key := append(accountHash.Bytes(), []byte{0x01, 0x02}...) + value := []byte("test value") + + // Test Set and Get for address-specific cache + cache.Set(key, value) + retrieved := cache.Get(key) + if !bytes.Equal(retrieved, value) { + t.Errorf("Expected %v, got %v", value, retrieved) + } + + // Test Set and Get for common cache + commonKey := []byte{0x01, 0x02} + commonValue := []byte("common value") + cache.Set(commonKey, commonValue) + retrieved = cache.Get(commonKey) + if !bytes.Equal(retrieved, commonValue) { + t.Errorf("Expected %v, got %v", commonValue, retrieved) + } + + // Test Get for non-existent key + nonExistentKey := append(accountHash.Bytes(), []byte{0xff, 0xff}...) + retrieved = cache.Get(nonExistentKey) + if len(retrieved) != 0 { + t.Errorf("Expected empty slice, got %v", retrieved) + } +} + +func TestAddressBiasedCache_Has(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + key := append(accountHash.Bytes(), []byte{0x01, 0x02}...) + value := []byte("test value") + + // Test Has for non-existent key + if cache.Has(key) { + t.Error("Has should return false for non-existent key") + } + + // Test Has for existing key + cache.Set(key, value) + if !cache.Has(key) { + t.Error("Has should return true for existing key") + } +} + +func TestAddressBiasedCache_Del(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + key := append(accountHash.Bytes(), []byte{0x01, 0x02}...) + value := []byte("test value") + + // Set and verify + cache.Set(key, value) + if !cache.Has(key) { + t.Error("Key should exist after Set") + } + + // Delete and verify + cache.Del(key) + if cache.Has(key) { + t.Error("Key should not exist after Del") + } + + // Verify Get returns empty after Del + retrieved := cache.Get(key) + if len(retrieved) != 0 { + t.Error("Get should return empty slice after Del") + } +} + +func TestAddressBiasedCache_Reset(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Add data to both address-specific and common caches + accountHash := crypto.Keccak256Hash(addr.Bytes()) + addressKey := append(accountHash.Bytes(), []byte{0x01, 0x02}...) + commonKey := []byte{0x01, 0x02} + + cache.Set(addressKey, []byte("address value")) + cache.Set(commonKey, []byte("common value")) + + // Verify data exists + if !cache.Has(addressKey) || !cache.Has(commonKey) { + t.Error("Data should exist before reset") + } + + // Reset all caches + cache.Reset() + + // Verify data is gone + if cache.Has(addressKey) || cache.Has(commonKey) { + t.Error("Data should not exist after reset") + } +} + +func TestAddressBiasedCache_MultipleAddresses(t *testing.T) { + addr1 := common.HexToAddress("0x1111111111111111111111111111111111111111") + addr2 := common.HexToAddress("0x2222222222222222222222222222222222222222") + addr3 := common.HexToAddress("0x3333333333333333333333333333333333333333") + + addressCacheSizes := map[common.Address]int{ + addr1: 1024 * 1024, + addr2: 512 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 256*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Verify correct number of address caches + var count int + cache.addressCaches.Range(func(key, value any) bool { + count++ + return true + }) + if count != 2 { + t.Errorf("Expected 2 address caches, got %d", count) + } + + // Test data isolation between caches + accountHash1 := crypto.Keccak256Hash(addr1.Bytes()) + accountHash2 := crypto.Keccak256Hash(addr2.Bytes()) + accountHash3 := crypto.Keccak256Hash(addr3.Bytes()) + + key1 := append(accountHash1.Bytes(), []byte{0x01}...) + key2 := append(accountHash2.Bytes(), []byte{0x01}...) + key3 := append(accountHash3.Bytes(), []byte{0x01}...) + + cache.Set(key1, []byte("value1")) + cache.Set(key2, []byte("value2")) + cache.Set(key3, []byte("value3")) + + // Verify values are isolated + val1 := cache.Get(key1) + val2 := cache.Get(key2) + val3 := cache.Get(key3) + + if !bytes.Equal(val1, []byte("value1")) { + t.Error("Value1 mismatch") + } + if !bytes.Equal(val2, []byte("value2")) { + t.Error("Value2 mismatch") + } + if !bytes.Equal(val3, []byte("value3")) { + t.Error("Value3 mismatch (should be in common cache)") + } +} + +func TestAddressBiasedCache_PreloadWithData(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + accountHash := crypto.Keccak256Hash(addr.Bytes()) + + // Create database with some storage trie nodes + db := rawdb.NewMemoryDatabase() + + // Write root node + rootData := []byte("root node data") + rawdb.WriteStorageTrieNode(db, accountHash, nil, rootData) + + // Write child nodes at depth 1 + for i := byte(0); i < 4; i++ { + path := []byte{i} + data := []byte("child node " + string(rune(i))) + rawdb.WriteStorageTrieNode(db, accountHash, path, data) + } + + // Create cache with preloading + addressCacheSizes := map[common.Address]int{ + addr: 10 * 1024, // Small cache to test limit + } + + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Wait for async preloading to complete + time.Sleep(100 * time.Millisecond) + + // Verify root node was loaded + rootKey := accountHash.Bytes() + if !cache.Has(rootKey) { + t.Error("Expected root node to be preloaded") + } + retrieved := cache.Get(rootKey) + if !bytes.Equal(retrieved, rootData) { + t.Error("Root node data mismatch") + } +} + +func TestAddressBiasedCache_GatherChildPaths(t *testing.T) { + cache := &AddressBiasedCache{} + + nodeData := []byte("dummy node data") + currentPath := []byte{0x01} + + childPaths := cache.gatherChildPaths(nodeData, currentPath) + + // Verify 16 child paths are generated (one for each nibble) + if len(childPaths) != 16 { + t.Errorf("Expected 16 child paths, got %d", len(childPaths)) + } + + // Verify each child path is correct + for i := byte(0); i < 16; i++ { + expectedPath := append([]byte{0x01}, i) + if !bytes.Equal(childPaths[i], expectedPath) { + t.Errorf("Child path %d mismatch: expected %v, got %v", i, expectedPath, childPaths[i]) + } + } + + // Test with empty current path + childPaths = cache.gatherChildPaths(nodeData, nil) + if len(childPaths) != 16 { + t.Errorf("Expected 16 child paths for root, got %d", len(childPaths)) + } + for i := byte(0); i < 16; i++ { + expectedPath := []byte{i} + if !bytes.Equal(childPaths[i], expectedPath) { + t.Errorf("Root child path %d mismatch: expected %v, got %v", i, expectedPath, childPaths[i]) + } + } +} + +func TestAddressBiasedCache_EmptyDatabase(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + _, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Wait for async preloading to complete + time.Sleep(50 * time.Millisecond) + + // Cache created successfully for empty database +} + +func TestAddressBiasedCache_AsyncPreloadWithConcurrentWrites(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + accountHash := crypto.Keccak256Hash(addr.Bytes()) + + // Create database with storage trie nodes + db := rawdb.NewMemoryDatabase() + + // Write root node + rootData := []byte("root node data") + rawdb.WriteStorageTrieNode(db, accountHash, nil, rootData) + + // Write some child nodes + for i := byte(0); i < 10; i++ { + path := []byte{i} + data := []byte("node data " + string(rune(i))) + rawdb.WriteStorageTrieNode(db, accountHash, path, data) + } + + // Create cache with async preloading + addressCacheSizes := map[common.Address]int{ + addr: 100 * 1024, + } + + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Immediately start writing to the cache while preloading is happening + // Use a key that doesn't exist in the database + testKey := append(accountHash.Bytes(), byte(5)) + manualValue := []byte("manually added value") + cache.Set(testKey, manualValue) + + // Wait for async preloading to complete + time.Sleep(100 * time.Millisecond) + + // Verify the manually added value or DB value exists + retrieved := cache.Get(testKey) + // The value could be either manual or from DB, just check it exists + if len(retrieved) == 0 { + t.Error("Expected key to have a value") + } +} + +func TestAddressBiasedCache_ConcurrentAccess(t *testing.T) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 512*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + + // Test concurrent reads and writes + done := make(chan bool) + numGoroutines := 10 + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + key := append(accountHash.Bytes(), byte(id)) + value := []byte("value " + string(rune(id))) + + // Perform multiple operations + for j := 0; j < 100; j++ { + cache.Set(key, value) + cache.Get(key) + cache.Has(key) + if j%10 == 0 { + cache.Del(key) + } + } + done <- true + }(i) + } + + // Wait for all goroutines to complete with timeout + timeout := time.After(5 * time.Second) + for i := 0; i < numGoroutines; i++ { + select { + case <-done: + case <-timeout: + t.Fatal("Test timed out waiting for concurrent operations") + } + } +} + +func BenchmarkAddressBiasedCache_Get_AddressCache(b *testing.B) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 10 * 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 5*1024*1024) + if err != nil { + b.Fatalf("Failed to create cache: %v", err) + } + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + key := append(accountHash.Bytes(), []byte{0x01, 0x02}...) + value := []byte("benchmark value") + cache.Set(key, value) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get(key) + } +} + +func BenchmarkAddressBiasedCache_Get_CommonCache(b *testing.B) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 10 * 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 5*1024*1024) + if err != nil { + b.Fatalf("Failed to create cache: %v", err) + } + + key := []byte{0x01, 0x02} + value := []byte("benchmark value") + cache.Set(key, value) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.Get(key) + } +} + +func BenchmarkAddressBiasedCache_Set(b *testing.B) { + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + addressCacheSizes := map[common.Address]int{ + addr: 10 * 1024 * 1024, + } + + db := rawdb.NewMemoryDatabase() + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 5*1024*1024) + if err != nil { + b.Fatalf("Failed to create cache: %v", err) + } + + accountHash := crypto.Keccak256Hash(addr.Bytes()) + value := []byte("benchmark value") + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := append(accountHash.Bytes(), byte(i%256)) + cache.Set(key, value) + } +} + +// TestAddressBiasedCache_GracefulShutdown tests that Close() properly stops +// background preload operations and waits for them to finish. +func TestAddressBiasedCache_GracefulShutdown(t *testing.T) { + db := rawdb.NewMemoryDatabase() + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + accountHash := crypto.Keccak256Hash(addr.Bytes()) + + // Create a large tree of storage trie nodes that will take some time to preload + nodeCount := 1000 + for i := 0; i < nodeCount; i++ { + path := []byte{byte(i % 256), byte(i / 256)} + nodeData := []byte(fmt.Sprintf("node-data-%d", i)) + rawdb.WriteStorageTrieNode(db, accountHash, path, nodeData) + } + + // Create cache with preloading + addressCacheSizes := map[common.Address]int{ + addr: 10 * 1024 * 1024, // 10 MB + } + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 1024*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Immediately close the cache to test interruption + cache.Close() + + // Verify the cache is still functional after Close() + key := append(accountHash.Bytes(), []byte{1, 2}...) + cache.Set(key, []byte("test-value")) + value := cache.Get(key) + if string(value) != "test-value" { + t.Errorf("Cache should still work after Close(), got: %s", string(value)) + } +} + +// TestAddressBiasedCache_MultipleClose tests that calling Close() multiple times +// doesn't cause issues. +func TestAddressBiasedCache_MultipleClose(t *testing.T) { + db := rawdb.NewMemoryDatabase() + addr := common.HexToAddress("0x1234567890123456789012345678901234567890") + + addressCacheSizes := map[common.Address]int{ + addr: 1024 * 1024, // 1 MB + } + cache, err := NewAddressBiasedCache(db, addressCacheSizes, 1024*1024) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + // Close multiple times should not panic + cache.Close() + cache.Close() + cache.Close() +} diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index 138962110f..cc1ff78c94 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -132,7 +132,7 @@ func (b *buffer) size() uint64 { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) { +func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache *AddressBiasedCache, statesCache *fastcache.Cache, id uint64, postFlush func()) { if b.done != nil { panic("duplicated flush operation") } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 87bee51561..61ab0c98c4 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -120,6 +120,10 @@ type Config struct { ReadOnly bool // Flag whether the database is opened in read only mode MaxDiffLayers uint64 // Maximum diff layers allowed in the layer tree. + // Address-specific cache configuration for biased caching + // Maps account address to cache size in bytes + AddressCacheSizes map[common.Address]int + // Testing configurations SnapshotNoBuild bool // Flag Whether the state generation is allowed NoAsyncFlush bool // Flag whether the background buffer flushing is allowed diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 06f0a7285f..d228bebc26 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -38,8 +38,8 @@ type diskLayer struct { // These two caches must be maintained separately, because the key // for the root node of the storage trie (accountHash) is identical // to the key for the account data. - nodes *fastcache.Cache // GC friendly memory cache of clean nodes - states *fastcache.Cache // GC friendly memory cache of clean states + nodes *AddressBiasedCache // GC friendly memory cache of clean nodes + states *fastcache.Cache // GC friendly memory cache of clean states buffer *buffer // Live buffer to aggregate writes frozen *buffer // Frozen node buffer waiting for flushing @@ -54,12 +54,16 @@ type diskLayer struct { } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *AddressBiasedCache, states *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer { // Initialize the clean caches if the memory allowance is not zero // or reuse the provided caches if they are not nil (inherited from // the original disk layer). if nodes == nil && db.config.TrieCleanSize != 0 { - nodes = fastcache.New(db.config.TrieCleanSize) + cachedNodes, err := NewAddressBiasedCache(db.diskdb, db.config.AddressCacheSizes, db.config.TrieCleanSize) + if err != nil { + panic(err) + } + nodes = cachedNodes } if states == nil && db.config.StateCleanSize != 0 { states = fastcache.New(db.config.StateCleanSize) @@ -137,7 +141,7 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co // Try to retrieve the trie node from the clean memory cache key := nodeCacheKey(owner, path) if dl.nodes != nil { - if blob := dl.nodes.Get(nil, key); len(blob) > 0 { + if blob := dl.nodes.Get(key); len(blob) > 0 { cleanNodeHitMeter.Mark(1) cleanNodeReadMeter.Mark(int64(len(blob))) return blob, crypto.Keccak256Hash(blob), &nodeLoc{loc: locCleanCache, depth: depth}, nil @@ -605,7 +609,8 @@ func (dl *diskLayer) waitFlush() error { } // terminate releases the frozen buffer if it's not nil and terminates the -// background state generator. +// background state generator. It also stops any background preload operations +// in the address-biased cache. func (dl *diskLayer) terminate() error { dl.lock.Lock() defer dl.lock.Unlock() @@ -619,5 +624,9 @@ func (dl *diskLayer) terminate() error { if dl.generator != nil { dl.generator.stop() } + // Stop background preload operations in the address-biased cache + if dl.nodes != nil { + dl.nodes.Close() + } return nil } diff --git a/triedb/pathdb/flush.go b/triedb/pathdb/flush.go index 6563dbccff..4f8e9edbfa 100644 --- a/triedb/pathdb/flush.go +++ b/triedb/pathdb/flush.go @@ -38,7 +38,7 @@ func nodeCacheKey(owner common.Hash, path []byte) []byte { // writeNodes writes the trie nodes into the provided database batch. // Note this function will also inject all the newly written nodes // into clean cache. -func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *fastcache.Cache) (total int) { +func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.Node, clean *AddressBiasedCache) (total int) { for owner, subset := range nodes { for path, n := range subset { if n.IsDeleted() { diff --git a/triedb/pathdb/nodes.go b/triedb/pathdb/nodes.go index f90bd0f01c..dd1d6a542d 100644 --- a/triedb/pathdb/nodes.go +++ b/triedb/pathdb/nodes.go @@ -22,7 +22,6 @@ import ( "io" "maps" - "github.com/VictoriaMetrics/fastcache" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/crypto" @@ -274,7 +273,7 @@ func (s *nodeSet) decode(r *rlp.Stream) error { } // write flushes nodes into the provided database batch as a whole. -func (s *nodeSet) write(batch ethdb.Batch, clean *fastcache.Cache) int { +func (s *nodeSet) write(batch ethdb.Batch, clean *AddressBiasedCache) int { nodes := make(map[common.Hash]map[string]*trienode.Node) if len(s.accountNodes) > 0 { nodes[common.Hash{}] = s.accountNodes