diff --git a/core/state/database.go b/core/state/database.go index ecc2c134da6c..ccdf6cd715d1 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -69,7 +69,8 @@ type Trie interface { // by the caller while they are stored in the trie. If a node was not found in the // database, a trie.MissingNodeError is returned. TryUpdate(key, value []byte) error - + BatchStart() + BatchEnd() // TryDelete removes any existing value for key from the trie. If a node was not // found in the database, a trie.MissingNodeError is returned. TryDelete(key []byte) error @@ -82,6 +83,8 @@ type Trie interface { // and external (for account tries) references. Commit(onleaf trie.LeafCallback) (common.Hash, error) + CommitTo(onleaf trie.LeafCallback, dbi *trie.DbInserter) (common.Hash, error) + // NodeIterator returns an iterator that returns nodes of the trie. Iteration // starts at the key after the given start key. NodeIterator(startKey []byte) trie.NodeIterator diff --git a/core/state/state_object.go b/core/state/state_object.go index 8680de021f42..93174edefd79 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -19,6 +19,7 @@ package state import ( "bytes" "fmt" + "github.com/ethereum/go-ethereum/trie" "io" "math/big" "time" @@ -272,7 +273,7 @@ func (s *stateObject) finalise() { } // updateTrie writes cached storage modifications into the object's storage trie. -func (s *stateObject) updateTrie(db Database) Trie { +func (s *stateObject) updateTrie(tr Trie) Trie { // Make sure all dirty slots are finalized into the pending storage area s.finalise() @@ -281,7 +282,6 @@ func (s *stateObject) updateTrie(db Database) Trie { defer func(start time.Time) { s.db.StorageUpdates += time.Since(start) }(time.Now()) } // Insert all the pending updates into the trie - tr := s.getTrie(db) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes if value == s.originStorage[key] { @@ -304,8 +304,8 @@ func (s *stateObject) updateTrie(db Database) Trie { } // UpdateRoot sets the trie root to the current root hash of -func (s *stateObject) updateRoot(db Database) { - s.updateTrie(db) +func (s *stateObject) updateRoot(tr Trie) { + s.updateTrie(tr) // Track the amount of time wasted on hashing the storge trie if metrics.EnabledExpensive { @@ -316,8 +316,8 @@ func (s *stateObject) updateRoot(db Database) { // CommitTrie the storage trie of the object to db. // This updates the trie root. -func (s *stateObject) CommitTrie(db Database) error { - s.updateTrie(db) +func (s *stateObject) CommitTrie(tr Trie) error { + s.updateTrie(tr) if s.dbErr != nil { return s.dbErr } @@ -332,6 +332,22 @@ func (s *stateObject) CommitTrie(db Database) error { return err } +func (s *stateObject) CommitTrieTo(tr Trie, inserter *trie.DbInserter) error { + s.updateTrie(tr) + if s.dbErr != nil { + return s.dbErr + } + // Track the amount of time wasted on committing the storge trie + if metrics.EnabledExpensive { + defer func(start time.Time) { s.db.StorageCommits += time.Since(start) }(time.Now()) + } + root, err := s.trie.CommitTo(nil, inserter) + if err == nil { + s.data.Root = root + } + return err +} + // AddBalance removes amount from c's balance. // It is used to add funds to the destination account of a transfer. func (s *stateObject) AddBalance(amount *big.Int) { diff --git a/core/state/statedb.go b/core/state/statedb.go index 03e118d117ec..7511b26b4eaa 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -18,6 +18,7 @@ package state import ( + "bytes" "errors" "fmt" "math/big" @@ -330,7 +331,7 @@ func (s *StateDB) StorageTrie(addr common.Address) Trie { return nil } cpy := stateObject.deepCopy(s) - return cpy.updateTrie(s.db) + return cpy.updateTrie(cpy.getTrie(s.db)) } func (s *StateDB) HasSuicided(addr common.Address) bool { @@ -688,16 +689,17 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash { // Finalise all the dirty storage states and write them into the tries s.Finalise(deleteEmptyObjects) - + s.trie.BatchStart() for addr := range s.stateObjectsPending { obj := s.stateObjects[addr] if obj.deleted { s.deleteStateObject(obj) } else { - obj.updateRoot(s.db) + obj.updateRoot(obj.getTrie(s.db)) s.updateStateObject(obj) } } + s.trie.BatchEnd() if len(s.stateObjectsPending) > 0 { s.stateObjectsPending = make(map[common.Address]struct{}) } @@ -729,20 +731,30 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { // Finalize any pending changes and merge everything into the tries s.IntermediateRoot(deleteEmptyObjects) + // The commit phase. We start by committing the account storage tries + // + // Start the dedicated inserter + dbi := trie.StartDBInserter(s.db.TrieDB()) + // Commit objects to the trie, measuring the elapsed time for addr := range s.stateObjectsDirty { if obj := s.stateObjects[addr]; !obj.deleted { // Write any contract code associated with the state object if obj.code != nil && obj.dirtyCode { - s.db.TrieDB().InsertBlob(common.BytesToHash(obj.CodeHash()), obj.code) + dbi.InsertBlob(obj.code, common.BytesToHash(obj.CodeHash())) + //s.db.TrieDB().InsertBlob(common.BytesToHash(obj.CodeHash()), obj.code) obj.dirtyCode = false } // Write any storage changes in the state object to its storage trie - if err := obj.CommitTrie(s.db); err != nil { + if err := obj.CommitTrieTo(obj.getTrie(s.db), dbi); err != nil { return common.Hash{}, err } } } + // Wait for storage update to punch through + //dbi.WaitForEmpty() + // .. or not + if len(s.stateObjectsDirty) > 0 { s.stateObjectsDirty = make(map[common.Address]struct{}) } @@ -750,18 +762,24 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { if metrics.EnabledExpensive { defer func(start time.Time) { s.AccountCommits += time.Since(start) }(time.Now()) } - return s.trie.Commit(func(leaf []byte, parent common.Hash) error { - var account Account + // The onleaf func is called _serially_, so we can reuse the same account + // for unmarshalling every time. + var account Account + var trieDb = s.db.TrieDB() + h, e := s.trie.CommitTo(func(leaf []byte, parent common.Hash) error { if err := rlp.DecodeBytes(leaf, &account); err != nil { return nil } if account.Root != emptyRoot { - s.db.TrieDB().Reference(account.Root, parent) + trieDb.Reference(account.Root, parent) } - code := common.BytesToHash(account.CodeHash) - if code != emptyCode { - s.db.TrieDB().Reference(code, parent) + if !bytes.Equal(emptyCodeHash, account.CodeHash) { + trieDb.Reference(common.BytesToHash(account.CodeHash), parent) } return nil - }) + }, dbi) + // Close it, and wait for empty + dbi.Close() + + return h, e } diff --git a/light/trie.go b/light/trie.go index e512bf6f9562..8a47435ee59b 100644 --- a/light/trie.go +++ b/light/trie.go @@ -112,6 +112,9 @@ func (t *odrTrie) TryUpdate(key, value []byte) error { }) } +func (t *odrTrie) BatchStart() {} +func (t *odrTrie) BatchEnd() {} + func (t *odrTrie) TryDelete(key []byte) error { key = crypto.Keccak256(key) return t.do(key, func() error { @@ -126,6 +129,13 @@ func (t *odrTrie) Commit(onleaf trie.LeafCallback) (common.Hash, error) { return t.trie.Commit(onleaf) } +func (t *odrTrie) CommitTo(onleaf trie.LeafCallback, dbi *trie.DbInserter) (common.Hash, error) { + if t.trie == nil { + return t.id.Root, nil + } + return t.trie.Commit(onleaf) +} + func (t *odrTrie) Hash() common.Hash { if t.trie == nil { return t.id.Root diff --git a/trie/database.go b/trie/database.go index b0fd78744492..6f4c379689ed 100644 --- a/trie/database.go +++ b/trie/database.go @@ -314,24 +314,24 @@ func (db *Database) InsertBlob(hash common.Hash, blob []byte) { db.lock.Lock() defer db.lock.Unlock() - db.insert(hash, blob, rawNode(blob)) + db.insert(hash, len(blob), rawNode(blob)) } // insert inserts a collapsed trie node into the memory database. This method is // a more generic version of InsertBlob, supporting both raw blob insertions as -// well ex trie node insertions. The blob must always be specified to allow proper +// well ex trie node insertions. The blob size must be specified to allow proper // size tracking. -func (db *Database) insert(hash common.Hash, blob []byte, node node) { +func (db *Database) insert(hash common.Hash, size int, node node) { // If the node's already cached, skip if _, ok := db.dirties[hash]; ok { return } - memcacheDirtyWriteMeter.Mark(int64(len(blob))) + memcacheDirtyWriteMeter.Mark(int64(size)) // Create the cached entry for this node entry := &cachedNode{ node: simplifyNode(node), - size: uint16(len(blob)), + size: uint16(size), flushPrev: db.newest, } for _, child := range entry.childs() { diff --git a/trie/hasher.go b/trie/hasher.go index 54f6a9de2b6a..210fd81c9c30 100644 --- a/trie/hasher.go +++ b/trie/hasher.go @@ -67,6 +67,7 @@ func newHasher(onleaf LeafCallback) *hasher { } func returnHasherToPool(h *hasher) { + h.onleaf = nil hasherPool.Put(h) } @@ -184,7 +185,7 @@ func (h *hasher) store(n node, db *Database, force bool) (node, error) { hash := common.BytesToHash(hash) db.lock.Lock() - db.insert(hash, h.tmp, n) + db.insert(hash, len(h.tmp), n) db.lock.Unlock() // Track external references from account->storage trie diff --git a/trie/pure_committer.go b/trie/pure_committer.go new file mode 100644 index 000000000000..dad9d566d286 --- /dev/null +++ b/trie/pure_committer.go @@ -0,0 +1,376 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package trie + +import ( + "errors" + "fmt" + "math/rand" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" +) + +// Leaf represents a trie leaf value +type Leaf struct { + size int // size of the rlp data (estimate) + hash common.Hash // hash of rlp data + node node // the node to commit + vnodes bool // set to true if the node (possibly) contains a valueNode + onleaf LeafCallback +} + +type committer struct { + tmp sliceBuffer + sha keccakState + + onleaf LeafCallback + leafCh chan *Leaf +} + +// committers live in a global db. +var committerPool = sync.Pool{ + New: func() interface{} { + return &committer{ + tmp: make(sliceBuffer, 0, 550), // cap is as large as a full fullNode. + sha: sha3.NewLegacyKeccak256().(keccakState), + } + }, +} + +func newCommitter(onleaf LeafCallback, leafCh chan *Leaf) *committer { + h := committerPool.Get().(*committer) + h.onleaf = onleaf + h.leafCh = leafCh + if onleaf != nil && leafCh == nil { + h.leafCh = make(chan *Leaf, 200) // arbitrary number + } + return h +} + +func returnCommitterToPool(h *committer) { + h.onleaf = nil + h.leafCh = nil + committerPool.Put(h) +} + +// hash collapses a node down into a hash node, also returning a copy of the +// original node initialized with the computed hash to replace the original one. +func (h *committer) commit(n node, db *Database, force bool) (node, error) { + // If we're not storing the node, just hashing, use available cached data + hash, dirty := n.cache() + if hash != nil && !dirty { + return hash, nil + } + if db == nil { + return nil, errors.New("no db provided") + } + // Commit children. then parent + // Remove the dirty flag. + switch cn := n.(type) { + case *shortNode: + // Commit child + collapsed := cn.copy() + if _, ok := cn.Val.(valueNode); !ok { + if childV, err := h.commit(cn.Val, db, false); err != nil { + return nil, err + } else { + collapsed.Val = childV + } + } + // The key needs to be copied, since we're delivering it to database + collapsed.Key = hexToCompact(cn.Key) + hashedNode := h.store(collapsed, db, force, true) + if hn, ok := hashedNode.(hashNode); ok { + cn.flags.dirty = false + return hn, nil + } else { + return collapsed, nil + } + case *fullNode: + hashedKids, hasVnodes, err := h.commitChildren(cn, db, force) + if err != nil { + return nil, err + } + collapsed := cn.copy() + collapsed.Children = hashedKids + + hashedNode := h.store(collapsed, db, force, hasVnodes) + if hn, ok := hashedNode.(hashNode); ok { + cn.flags.dirty = false + return hn, nil + } else { + return collapsed, nil + } + case valueNode: + return h.store(cn, db, force, false), nil + // hashnodes aren't stored + case hashNode: + return cn, nil + } + return hash, nil +} + +// commitChildren commits the children of the given fullnode +func (h *committer) commitChildren(n *fullNode, db *Database, force bool) ([17]node, bool, error) { + var children [17]node + var hasValueNodeChildren = false + for i, child := range n.Children { + if child == nil { + continue + } + hnode, err := h.commit(child, db, false) + if err != nil { + return children, false, err + } + children[i] = hnode + if _, ok := hnode.(valueNode); ok { + hasValueNodeChildren = true + } + } + return children, hasValueNodeChildren, nil +} + +// store hashes the node n and if we have a storage layer specified, it writes +// the key/value pair to it and tracks any node->child references as well as any +// node->external trie references. +func (h *committer) store(n node, db *Database, force bool, hasVnodeChildren bool) node { + // Larger nodes are replaced by their hash and stored in the database. + var ( + hash, _ = n.cache() + size int + ) + if hash == nil { + if vn, ok := n.(valueNode); ok { + h.tmp.Reset() + if err := rlp.Encode(&h.tmp, vn); err != nil { + panic("encode error: " + err.Error()) + } + size = len(h.tmp) + if size < 32 && !force { + return n // Nodes smaller than 32 bytes are stored inside their parent + } + hash = h.makeHashNode(h.tmp) + } else { + // This was not generated - must be a small node stored in the parent + // No need to do anything here + return n + } + } else { + // We have the hash already, estimate the RLP encoding-size of the node. + // The size is used for mem tracking, does not need to be exact + size = estimateSize(n) + } + // If we're using channel-based leaf-reporting, send to channel. + // The leaf channel will be active only when there an active leaf-callback + if h.leafCh != nil { + h.leafCh <- &Leaf{ + size: size, + hash: common.BytesToHash(hash), + node: n, + vnodes: hasVnodeChildren, + onleaf: h.onleaf, + } + } else if db != nil { + // No leaf-callback used, but there's still a database. Do serial + // insertion + db.lock.Lock() + db.insert(common.BytesToHash(hash), size, n) + db.lock.Unlock() + } + return hash +} + +// commitLoop does the actual insert + leaf callback for nodes +func (h *committer) commitLoop(db *Database, wg *sync.WaitGroup) { + defer wg.Done() + for item := range h.leafCh { + var ( + hash = item.hash + size = item.size + n = item.node + hasVnodes = item.vnodes + ) + // We are pooling the trie nodes into an intermediate memory cache + db.lock.Lock() + db.insert(hash, size, n) + db.lock.Unlock() + if h.onleaf != nil && hasVnodes { + switch n := n.(type) { + case *shortNode: + if child, ok := n.Val.(valueNode); ok { + h.onleaf(child, hash) + } + case *fullNode: + for i := 0; i < 16; i++ { + if child, ok := n.Children[i].(valueNode); ok { + h.onleaf(child, hash) + } + } + } + } + } +} + +func (h *committer) makeHashNode(data []byte) hashNode { + //fmt.Printf("hashing: %x\n", data) + n := make(hashNode, h.sha.Size()) + h.sha.Reset() + h.sha.Write(data) + h.sha.Read(n) + return n +} + +// estimateSize estimates the size of an rlp-encoded node, without actually +// rlp-encoding it (zero allocs). This method has been experimentally tried, and with a trie +// with 1000 leafs, the only errors above 1% are on small shortnodes, where this +// method overestimates by 2 or 3 bytes (e.g. 37 instead of 35) +func estimateSize(n node) int { + switch n := n.(type) { + case *shortNode: + // A short node contains a compacted key, and a value. + return 3 + len(n.Key) + estimateSize(n.Val) + case *fullNode: + // A full node contains up to 16 hashes (some nils), and a key + s := 3 + for i := 0; i < 16; i++ { + if child := n.Children[i]; child != nil { + s += estimateSize(child) + } else { + s += 1 + } + } + return s + case valueNode: + return 1 + len(n) + case hashNode: + return 1 + len(n) + default: + panic(fmt.Sprintf("node type %T", n)) + + } +} + +/** +Todo, we could improve the situation for small trie commits (storage tries), +if we use one dedicated database-inserter, instead of having each one spin up a +separate instance. + +The gain is not only that we save some goroutine start/stop, it's also that +we can process trie M while we're still committing trie N -- since we don't +have to do the waitgroup-wait between each trie commit. + +The code below is a rough sketch, it needs to be integrated nicely without causing +dependency cycles between state, core and trie. + +**/ + +type DbInserter struct { + inputCh chan *Leaf // This is where input to database is sent + reportCh chan int // At certain points, callers wants to know that we're done + db *Database + wg sync.WaitGroup +} + +// commitLoop does the actual insert + leaf callback for nodes +func (dbi *DbInserter) run() { + defer dbi.wg.Done() + for item := range dbi.inputCh { + var ( + hash = item.hash + size = item.size + n = item.node + hasVnodes = item.vnodes + onleaf = item.onleaf + ) + if size < 0 { + // This is an end-marker object. + dbi.reportCh <- size + continue + } + // We are pooling the trie nodes into an intermediate memory cache + dbi.db.lock.Lock() + dbi.db.insert(hash, size, n) + dbi.db.lock.Unlock() + if onleaf != nil && hasVnodes { + switch n := n.(type) { + case *shortNode: + if child, ok := n.Val.(valueNode); ok { + onleaf(child, hash) + } + case *fullNode: + for i := 0; i < 16; i++ { + if child, ok := n.Children[i].(valueNode); ok { + onleaf(child, hash) + } + } + } + } + } +} + +func (dbi *DbInserter) Close() { + close(dbi.inputCh) + dbi.wg.Wait() +} + +func (dbi *DbInserter) Insert(leaf *Leaf) { + dbi.inputCh <- leaf +} + +// WaitForEmpty returns to the caller when all the data currently in the +// channel has been handled +func (dbi *DbInserter) WaitForEmpty() { + // Send an arbitrary id there + checksum := rand.Int() + dbi.inputCh <- &Leaf{ + size: -checksum, + } + // And wait for it to come back + for { + select { + case retval := <-dbi.reportCh: + if retval == checksum { + return + } + + } + } +} + +func (dbi *DbInserter) InsertBlob(blob []byte, blobHash common.Hash) { + dbi.inputCh <- &Leaf{ + size: len(blob), + hash: blobHash, + node: rawNode(blob), + vnodes: false, + } +} + +func StartDBInserter(db *Database) *DbInserter { + + dbi := &DbInserter{ + inputCh: make(chan *Leaf, 200), + reportCh: make(chan int), + db: db, + } + dbi.wg.Add(1) + go dbi.run() + return dbi +} diff --git a/trie/pure_hasher.go b/trie/pure_hasher.go new file mode 100644 index 000000000000..0d2a74e5dbed --- /dev/null +++ b/trie/pure_hasher.go @@ -0,0 +1,176 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package trie + +import ( + "sync" + + "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/crypto/sha3" +) + +type pureHasher struct { + sha keccakState + + tmp sliceBuffer + parallel bool +} + +// hashers live in a global db. +var pureHasherPool = sync.Pool{ + New: func() interface{} { + return &pureHasher{ + tmp: make(sliceBuffer, 0, 550), // cap is as large as a full fullNode. + sha: sha3.NewLegacyKeccak256().(keccakState), + } + }, +} + +func newPureHasher(parallel bool) *pureHasher { + h := pureHasherPool.Get().(*pureHasher) + h.parallel = parallel + return h +} + +func returnPureHasherToPool(h *pureHasher) { + pureHasherPool.Put(h) +} + +// hash collapses a node down into a hash node, also returning a copy of the +// original node initialized with the computed hash to replace the original one. +func (h *pureHasher) hash(n node, force bool) (hashed node, cached node) { + // We're not storing the node, just hashing, use available cached data + if hash, _ := n.cache(); hash != nil { + return hash, n + } + // Trie not processed yet or needs storage, walk the children + switch n := n.(type) { + case *shortNode: + collapsed, cached := h.hashShortNodeChildren(n) + hashed := h.shortnodeToHash(collapsed, force) + // We need to retain the possibly _not_ hashed node, in case it was too + // small to be hashed + if hn, ok := hashed.(hashNode); ok { + cached.flags.hash = hn + } else { + cached.flags.hash = nil + } + return hashed, cached + case *fullNode: + collapsed, cached := h.hashFullNodeChildren(n) + hashed = h.fullnodeToHash(collapsed, force) + if hn, ok := hashed.(hashNode); ok { + cached.flags.hash = hn + } else { + cached.flags.hash = nil + } + return hashed, cached + default: + // Value and hash nodes don't have children so they're left as were + return n, n + } +} + +// hashShortNodeChildren collapses the short node. The returned collapsed node +// holds a live reference to the Key, and must not be modified. +// The cached +func (h *pureHasher) hashShortNodeChildren(n *shortNode) (collapsed, cached *shortNode) { + // Hash the short node's child, caching the newly hashed subtree + collapsed, cached = n.copy(), n.copy() + // Previously, we did copy this one. We don't seem to need to actually + // do that, since we don't overwrite/reuse keys + //cached.Key = common.CopyBytes(n.Key) + collapsed.Key = hexToCompact(n.Key) + // Unless the child is a valuenode or hashnode, hash it + switch n.Val.(type) { + case *fullNode, *shortNode: + collapsed.Val, cached.Val = h.hash(n.Val, false) + } + return collapsed, cached +} + +func (h *pureHasher) hashFullNodeChildren(n *fullNode) (collapsed *fullNode, cached *fullNode) { + // Hash the full node's children, caching the newly hashed subtrees + cached = n.copy() + collapsed = n.copy() + if h.parallel { + var wg sync.WaitGroup + wg.Add(16) + for i := 0; i < 16; i++ { + go func(i int) { + hasher := newPureHasher(false) + if child := n.Children[i]; child != nil { + collapsed.Children[i], cached.Children[i] = hasher.hash(child, false) + } else { + collapsed.Children[i] = nilValueNode + } + wg.Done() + defer returnPureHasherToPool(hasher) + }(i) + } + wg.Wait() + } else { + for i := 0; i < 16; i++ { + if child := n.Children[i]; child != nil { + collapsed.Children[i], cached.Children[i] = h.hash(child, false) + } else { + collapsed.Children[i] = nilValueNode + } + } + } + return collapsed, cached +} + +// shortnodeToHash creates a hashNode from a shortNode. The supplied shortnode +// should have hex-type Key, which will be converted (without modification) +// into compact form for RLP encoding. +// If the rlp data is smaller than 32 bytes, `nil` is returned. +func (h *pureHasher) shortnodeToHash(n *shortNode, force bool) node { + h.tmp.Reset() + if err := rlp.Encode(&h.tmp, n); err != nil { + panic("encode error: " + err.Error()) + } + + if len(h.tmp) < 32 && !force { + return n // Nodes smaller than 32 bytes are stored inside their parent + } + return h.hashData(h.tmp) +} + +// shortnodeToHash is used to creates a hashNode from a set of hashNodes, (which +// may contain nil values) +func (h *pureHasher) fullnodeToHash(n *fullNode, force bool) node { + h.tmp.Reset() + // Generate the RLP encoding of the node + if err := n.EncodeRLP(&h.tmp); err != nil { + panic("encode error: " + err.Error()) + } + + if len(h.tmp) < 32 && !force { + return n // Nodes smaller than 32 bytes are stored inside their parent + } + return h.hashData(h.tmp) +} + +// hashData hashes the provided data +func (h *pureHasher) hashData(data []byte) hashNode { + n := make(hashNode, 32) + h.sha.Reset() + h.sha.Write(data) + h.sha.Read(n) + return n +} diff --git a/trie/secure_trie.go b/trie/secure_trie.go index fbc591ed108a..96bf81581b34 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -109,6 +109,13 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error { return nil } +func (t *SecureTrie) BatchStart(){ + t.trie.batchStart() +} +func (t *SecureTrie) BatchEnd(){ + t.trie.batchEnd() +} + // Delete removes any existing value for key from the trie. func (t *SecureTrie) Delete(key []byte) { if err := t.TryDelete(key); err != nil { @@ -154,6 +161,20 @@ func (t *SecureTrie) Commit(onleaf LeafCallback) (root common.Hash, err error) { return t.trie.Commit(onleaf) } +func (t *SecureTrie) CommitTo(onleaf LeafCallback, inserter *DbInserter ) (root common.Hash, err error) { + // Write all the pre-images to the actual disk database + if len(t.getSecKeyCache()) > 0 { + t.trie.db.lock.Lock() + for hk, key := range t.secKeyCache { + t.trie.db.insertPreimage(common.BytesToHash([]byte(hk)), key) + } + t.trie.db.lock.Unlock() + + t.secKeyCache = make(map[string][]byte) + } + return t.trie.CommitTo(onleaf, inserter) +} + // Hash returns the root hash of SecureTrie. It does not write to the // database and can be used even if the trie doesn't have one. func (t *SecureTrie) Hash() common.Hash { diff --git a/trie/trie.go b/trie/trie.go index 920e331fd62f..bfd3d7733f99 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -20,6 +20,7 @@ package trie import ( "bytes" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -47,6 +48,11 @@ type LeafCallback func(leaf []byte, parent common.Hash) error type Trie struct { db *Database root node + // Keep a rough track of the number of leafs to commit + dirtyCount int + // And leafs to hash + unhashedCount int + batchMode bool } // newFlag returns the cache flag value for a newly created node. @@ -162,6 +168,8 @@ func (t *Trie) Update(key, value []byte) { // // If a node was not found in the database, a MissingNodeError is returned. func (t *Trie) TryUpdate(key, value []byte) error { + t.unhashedCount++ + t.dirtyCount++ k := keybytesToHex(key) if len(value) != 0 { _, n, err := t.insert(t.root, nil, k, valueNode(value)) @@ -179,6 +187,13 @@ func (t *Trie) TryUpdate(key, value []byte) error { return nil } +func (t *Trie) batchStart() { + t.batchMode = true +} +func (t *Trie) batchEnd() { + t.batchMode = false +} + func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) { if len(key) == 0 { if v, ok := n.(valueNode); ok { @@ -221,7 +236,20 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error if !dirty || err != nil { return false, n, err } - n = n.copy() + // If we're in batch-mode, we don't keep 'ephemeral' changes. + // When we modify a node, we only copy it in case it is an old committed + // node. + // If the node is "new", we just update in place. + if t.batchMode { + if h, dirty := n.cache(); !dirty || h != nil { + // This node is either not dirty, or already hashed. We copy it + n = n.copy() + } else { + // No copy + } + } else { + n = n.copy() + } n.flags = t.newFlag() n.Children[key[0]] = nn return true, n, nil @@ -258,6 +286,8 @@ func (t *Trie) Delete(key []byte) { // TryDelete removes any existing value for key from the trie. // If a node was not found in the database, a MissingNodeError is returned. func (t *Trie) TryDelete(key []byte) error { + t.unhashedCount++ + t.dirtyCount++ k := keybytesToHex(key) _, n, err := t.delete(t.root, nil, k) if err != nil { @@ -404,18 +434,25 @@ func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) { // Hash returns the root hash of the trie. It does not write to the // database and can be used even if the trie doesn't have one. func (t *Trie) Hash() common.Hash { - hash, cached, _ := t.hashRoot(nil, nil) + hash, cached, _ := t.hashRoot(nil) t.root = cached return common.BytesToHash(hash.(hashNode)) } -// Commit writes all nodes to the trie's memory database, tracking the internal +// oldCommit is the old implementation of Commit, which uses the +// regular hasher. +// It writes all nodes to the trie's memory database, tracking the internal // and external (for account tries) references. -func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { +func (t *Trie) oldCommit(onleaf LeafCallback) (root common.Hash, err error) { if t.db == nil { panic("commit called on trie with nil database") } - hash, cached, err := t.hashRoot(t.db, onleaf) + if t.root == nil { + return emptyRoot, nil + } + h := newHasher(onleaf) + defer returnHasherToPool(h) + hash, cached, err := h.hash(t.root, t.db, true) if err != nil { return common.Hash{}, err } @@ -423,7 +460,53 @@ func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { return common.BytesToHash(hash.(hashNode)), nil } -func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) { +func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { + if t.db == nil { + panic("commit called on trie with nil database") + } + if t.root == nil { + return emptyRoot, nil + } + rootHash := t.Hash() + h := newCommitter(onleaf, nil) + defer returnCommitterToPool(h) + var wg sync.WaitGroup + if onleaf != nil { + wg.Add(1) + go h.commitLoop(t.db, &wg) + } + _, err = h.commit(t.root, t.db, true) + if onleaf != nil { + close(h.leafCh) + wg.Wait() + } + if err != nil { + return common.Hash{}, err + } + t.dirtyCount = 0 + return rootHash, nil +} + +func (t *Trie) CommitTo(onleaf LeafCallback, dbi *DbInserter) (root common.Hash, err error) { + if t.db == nil { + panic("commit called on trie with nil database") + } + if t.root == nil { + return emptyRoot, nil + } + rootHash := t.Hash() + h := newCommitter(onleaf, dbi.inputCh) + h.leafCh = dbi.inputCh + _, err = h.commit(t.root, t.db, true) + if err != nil { + return common.Hash{}, err + } + t.dirtyCount = 0 + return rootHash, nil +} + +// oldHashRoot is the old implementation of hashRoot, which uses the regular hasher +func (t *Trie) oldHashRoot(db *Database, onleaf LeafCallback) (node, node, error) { if t.root == nil { return hashNode(emptyRoot.Bytes()), nil, nil } @@ -431,3 +514,16 @@ func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) { defer returnHasherToPool(h) return h.hash(t.root, db, true) } + +// hashRoot calculates the root hash of the given trie +func (t *Trie) hashRoot(db *Database) (node, node, error) { + if t.root == nil { + return hashNode(emptyRoot.Bytes()), nil, nil + } + // If the number of changes is below 100, we let one thread handle it + h := newPureHasher(t.unhashedCount >= 100) + defer returnPureHasherToPool(h) + hashed, cached := h.hash(t.root, true) + t.unhashedCount = 0 + return hashed, cached, nil +} diff --git a/trie/trie_test.go b/trie/trie_test.go index e53ac568e9c3..fc3a98c70705 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -161,7 +161,7 @@ func TestInsert(t *testing.T) { exp := common.HexToHash("8aad789dff2f538bca5d8ea56e8abe10f4c7ba3a5dea95fea4cd6e7c3a1168d3") root := trie.Hash() if root != exp { - t.Errorf("exp %x got %x", exp, root) + t.Errorf("case 1: exp %x got %x", exp, root) } trie = newEmpty() @@ -173,7 +173,7 @@ func TestInsert(t *testing.T) { t.Fatalf("commit error: %v", err) } if root != exp { - t.Errorf("exp %x got %x", exp, root) + t.Errorf("case 2: exp %x got %x", exp, root) } } @@ -316,6 +316,40 @@ func TestLargeValue(t *testing.T) { trie.Hash() } +// TestRandomCases tests som cases that were found via random fuzzing +func TestRandomCases(t *testing.T) { + var rt []randTestStep = []randTestStep{ + {op: 6, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 0 + {op: 6, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 1 + {op: 0, key: common.Hex2Bytes("d51b182b95d677e5f1c82508c0228de96b73092d78ce78b2230cd948674f66fd1483bd"), value: common.Hex2Bytes("0000000000000002")}, // step 2 + {op: 2, key: common.Hex2Bytes("c2a38512b83107d665c65235b0250002882ac2022eb00711552354832c5f1d030d0e408e"), value: common.Hex2Bytes("")}, // step 3 + {op: 3, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 4 + {op: 3, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 5 + {op: 6, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 6 + {op: 3, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 7 + {op: 0, key: common.Hex2Bytes("c2a38512b83107d665c65235b0250002882ac2022eb00711552354832c5f1d030d0e408e"), value: common.Hex2Bytes("0000000000000008")}, // step 8 + {op: 0, key: common.Hex2Bytes("d51b182b95d677e5f1c82508c0228de96b73092d78ce78b2230cd948674f66fd1483bd"), value: common.Hex2Bytes("0000000000000009")}, // step 9 + {op: 2, key: common.Hex2Bytes("fd"), value: common.Hex2Bytes("")}, // step 10 + {op: 6, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 11 + {op: 6, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 12 + {op: 0, key: common.Hex2Bytes("fd"), value: common.Hex2Bytes("000000000000000d")}, // step 13 + {op: 6, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 14 + {op: 1, key: common.Hex2Bytes("c2a38512b83107d665c65235b0250002882ac2022eb00711552354832c5f1d030d0e408e"), value: common.Hex2Bytes("")}, // step 15 + {op: 3, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 16 + {op: 0, key: common.Hex2Bytes("c2a38512b83107d665c65235b0250002882ac2022eb00711552354832c5f1d030d0e408e"), value: common.Hex2Bytes("0000000000000011")}, // step 17 + {op: 5, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 18 + {op: 3, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 19 + {op: 0, key: common.Hex2Bytes("d51b182b95d677e5f1c82508c0228de96b73092d78ce78b2230cd948674f66fd1483bd"), value: common.Hex2Bytes("0000000000000014")}, // step 20 + {op: 0, key: common.Hex2Bytes("d51b182b95d677e5f1c82508c0228de96b73092d78ce78b2230cd948674f66fd1483bd"), value: common.Hex2Bytes("0000000000000015")}, // step 21 + {op: 0, key: common.Hex2Bytes("c2a38512b83107d665c65235b0250002882ac2022eb00711552354832c5f1d030d0e408e"), value: common.Hex2Bytes("0000000000000016")}, // step 22 + {op: 5, key: common.Hex2Bytes(""), value: common.Hex2Bytes("")}, // step 23 + {op: 1, key: common.Hex2Bytes("980c393656413a15c8da01978ed9f89feb80b502f58f2d640e3a2f5f7a99a7018f1b573befd92053ac6f78fca4a87268"), value: common.Hex2Bytes("")}, // step 24 + {op: 1, key: common.Hex2Bytes("fd"), value: common.Hex2Bytes("")}, // step 25 + } + runRandTest(rt) + +} + // randTest performs random trie operations. // Instances of this test are created by Generate. type randTest []randTestStep @@ -375,6 +409,8 @@ func runRandTest(rt randTest) bool { values := make(map[string]string) // tracks content of the trie for i, step := range rt { + fmt.Printf("{op: %d, key: common.Hex2Bytes(\"%x\"), value: common.Hex2Bytes(\"%x\")}, // step %d\n", + step.op, step.key, step.value, i) switch step.op { case opUpdate: tr.Update(step.key, step.value) @@ -470,10 +506,13 @@ func benchGet(b *testing.B, commit bool) { func benchUpdate(b *testing.B, e binary.ByteOrder) *Trie { trie := newEmpty() k := make([]byte, 32) + b.ReportAllocs() + trie.batchStart() for i := 0; i < b.N; i++ { e.PutUint64(k, uint64(i)) trie.Update(k, k) } + trie.batchEnd() return trie } @@ -481,18 +520,135 @@ func benchUpdate(b *testing.B, e binary.ByteOrder) *Trie { // we cannot use b.N as the number of hashing rouns, since all rounds apart from // the first one will be NOOP. As such, we'll use b.N as the number of account to // insert into the trie before measuring the hashing. +// BenchmarkHash-6 288680 4561 ns/op 682 B/op 9 allocs/op +// BenchmarkHash-6 275095 4800 ns/op 685 B/op 9 allocs/op +// pure hasher: +// BenchmarkHash-6 319362 4230 ns/op 675 B/op 9 allocs/op +// BenchmarkHash-6 257460 4674 ns/op 689 B/op 9 allocs/op +// With hashing in-between and pure hasher: +// BenchmarkHash-6 225417 7150 ns/op 982 B/op 12 allocs/op +// BenchmarkHash-6 220378 6197 ns/op 983 B/op 12 allocs/op +// same with old hasher +// BenchmarkHash-6 229758 6437 ns/op 981 B/op 12 allocs/op +// BenchmarkHash-6 212610 7137 ns/op 986 B/op 12 allocs/op func BenchmarkHash(b *testing.B) { + // Create a realistic account trie to hash. We're first adding and hashing N + // entries, then adding N more. + addresses, accounts := makeAccounts(2 * b.N) + // Insert the accounts into the trie and hash it + trie := newEmpty() + i := 0 + for ; i < len(addresses)/2; i++ { + trie.Update(crypto.Keccak256(addresses[i][:]), accounts[i]) + } + trie.Hash() + for ; i < len(addresses); i++ { + trie.Update(crypto.Keccak256(addresses[i][:]), accounts[i]) + } + b.ResetTimer() + b.ReportAllocs() + //trie.hashRoot(nil, nil) + trie.Hash() +} + +type account struct { + Nonce uint64 + Balance *big.Int + Root common.Hash + Code []byte +} + +// Benchmarks the trie Commit following a Hash. Since the trie caches the result of any operation, +// we cannot use b.N as the number of hashing rouns, since all rounds apart from +// the first one will be NOOP. As such, we'll use b.N as the number of account to +// insert into the trie before measuring the hashing. +func BenchmarkCommitAfterHash(b *testing.B) { + b.Run("no-onleaf", func(b *testing.B) { + benchmarkCommitAfterHash(b, nil) + }) + var a account + onleaf := func(leaf []byte, parent common.Hash) error { + rlp.DecodeBytes(leaf, &a) + return nil + } + b.Run("with-onleaf", func(b *testing.B) { + benchmarkCommitAfterHash(b, onleaf) + }) +} + +func benchmarkCommitAfterHash(b *testing.B, onleaf LeafCallback) { // Make the random benchmark deterministic - random := rand.New(rand.NewSource(0)) + addresses, accounts := makeAccounts(b.N) + trie := newEmpty() + for i := 0; i < len(addresses); i++ { + trie.Update(crypto.Keccak256(addresses[i][:]), accounts[i]) + } + // Insert the accounts into the trie and hash it + trie.Hash() + b.ResetTimer() + b.ReportAllocs() + trie.Commit(onleaf) +} + +func TestTinyTrie(t *testing.T) { + // Create a realistic account trie to hash + _, accounts := makeAccounts(10000) + trie := newEmpty() + trie.Update(common.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000001337"), accounts[3]) + if exp, root := common.HexToHash("4fa6efd292cffa2db0083b8bedd23add2798ae73802442f52486e95c3df7111c"), trie.Hash(); exp != root { + t.Fatalf("1: got %x, exp %x", root, exp) + } + trie.Update(common.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000001338"), accounts[4]) + if exp, root := common.HexToHash("cb5fb1213826dad9e604f095f8ceb5258fe6b5c01805ce6ef019a50699d2d479"), trie.Hash(); exp != root { + t.Fatalf("2: got %x, exp %x", root, exp) + } + trie.Update(common.Hex2Bytes("0000000000000000000000000000000000000000000000000000000000001339"), accounts[4]) + if exp, root := common.HexToHash("ed7e06b4010057d8703e7b9a160a6d42cf4021f9020da3c8891030349a646987"), trie.Hash(); exp != root { + t.Fatalf("3: got %x, exp %x", root, exp) + } + checktr, _ := New(common.Hash{}, trie.db) + it := NewIterator(trie.NodeIterator(nil)) + for it.Next() { + checktr.Update(it.Key, it.Value) + } + if troot, itroot := trie.Hash(), checktr.Hash(); troot != itroot { + t.Fatalf("hash mismatch in opItercheckhash, trie: %x, check: %x", troot, itroot) + } +} + +func TestCommitAfterHash(t *testing.T) { // Create a realistic account trie to hash - addresses := make([][20]byte, b.N) + addresses, accounts := makeAccounts(1000) + trie := newEmpty() + for i := 0; i < len(addresses); i++ { + trie.Update(crypto.Keccak256(addresses[i][:]), accounts[i]) + } + // Insert the accounts into the trie and hash it + trie.Hash() + trie.Commit(nil) + root := trie.Hash() + exp := common.HexToHash("e5e9c29bb50446a4081e6d1d748d2892c6101c1e883a1f77cf21d4094b697822") + if exp != root { + t.Errorf("got %x, exp %x", root, exp) + } + root, _ = trie.Commit(nil) + if exp != root { + t.Errorf("got %x, exp %x", root, exp) + } +} + +func makeAccounts(size int) (addresses [][20]byte, accounts [][]byte) { + // Make the random benchmark deterministic + random := rand.New(rand.NewSource(0)) + // Create a realistic account trie to hash + addresses = make([][20]byte, size) for i := 0; i < len(addresses); i++ { for j := 0; j < len(addresses[i]); j++ { addresses[i][j] = byte(random.Intn(256)) } } - accounts := make([][]byte, len(addresses)) + accounts = make([][]byte, len(addresses)) for i := 0; i < len(accounts); i++ { var ( nonce = uint64(random.Int63()) @@ -500,16 +656,116 @@ func BenchmarkHash(b *testing.B) { root = emptyRoot code = crypto.Keccak256(nil) ) - accounts[i], _ = rlp.EncodeToBytes([]interface{}{nonce, balance, root, code}) + accounts[i], _ = rlp.EncodeToBytes(&account{nonce, balance, root, code}) } - // Insert the accounts into the trie and hash it + return addresses, accounts +} + +// BenchmarkCommitAfterHashFixedSize benchmarks the Commit (after Hash) of a fixed number of updates to a trie. +// This benchmark is meant to capture the difference on efficiency of small versus large changes. Typically, +// storage tries are small (a couple of entries), whereas the full post-block account trie update is large (a couple +// of thousand entries) +func BenchmarkHashFixedSize(b *testing.B) { + b.Run("10", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(20) + for i := 0; i < b.N; i++ { + benchmarkHashFixedSize(b, acc, add) + } + }) + b.Run("100", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(100) + for i := 0; i < b.N; i++ { + benchmarkHashFixedSize(b, acc, add) + } + }) + + b.Run("1K", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(1000) + for i := 0; i < b.N; i++ { + benchmarkHashFixedSize(b, acc, add) + } + }) + b.Run("10K", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(10000) + for i := 0; i < b.N; i++ { + benchmarkHashFixedSize(b, acc, add) + } + }) + b.Run("100K", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(100000) + for i := 0; i < b.N; i++ { + benchmarkHashFixedSize(b, acc, add) + } + }) +} + +func benchmarkHashFixedSize(b *testing.B, addresses [][20]byte, accounts [][]byte) { + b.ReportAllocs() trie := newEmpty() for i := 0; i < len(addresses); i++ { trie.Update(crypto.Keccak256(addresses[i][:]), accounts[i]) } - b.ResetTimer() + // Insert the accounts into the trie and hash it + b.StartTimer() + trie.Hash() + b.StopTimer() +} + +func BenchmarkCommitAfterHashFixedSize(b *testing.B) { + b.Run("10", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(20) + for i := 0; i < b.N; i++ { + benchmarkCommitAfterHashFixedSize(b, acc, add) + } + }) + b.Run("100", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(100) + for i := 0; i < b.N; i++ { + benchmarkCommitAfterHashFixedSize(b, acc, add) + } + }) + + b.Run("1K", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(1000) + for i := 0; i < b.N; i++ { + benchmarkCommitAfterHashFixedSize(b, acc, add) + } + }) + b.Run("10K", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(10000) + for i := 0; i < b.N; i++ { + benchmarkCommitAfterHashFixedSize(b, acc, add) + } + }) + b.Run("100K", func(b *testing.B) { + b.StopTimer() + acc, add := makeAccounts(100000) + for i := 0; i < b.N; i++ { + benchmarkCommitAfterHashFixedSize(b, acc, add) + } + }) +} + +func benchmarkCommitAfterHashFixedSize(b *testing.B, addresses [][20]byte, accounts [][]byte) { b.ReportAllocs() + trie := newEmpty() + for i := 0; i < len(addresses); i++ { + trie.Update(crypto.Keccak256(addresses[i][:]), accounts[i]) + } + // Insert the accounts into the trie and hash it trie.Hash() + b.StartTimer() + trie.Commit(nil) + b.StopTimer() } func tempDB() (string, *Database) {