diff --git a/config/consensus.go b/config/consensus.go index 4642e9d76a..89209789f0 100644 --- a/config/consensus.go +++ b/config/consensus.go @@ -704,8 +704,8 @@ var MaxBytesKeyValueLen int // of the consensus protocols. used for decoding purposes. var MaxExtraAppProgramLen int -// MaxAvailableAppProgramLen is the largest supported app program size include the extra pages -// supported supported by any of the consensus protocols. used for decoding purposes. +// MaxAvailableAppProgramLen is the largest supported app program size including the extra +// pages supported by any of the consensus protocols. used for decoding purposes. var MaxAvailableAppProgramLen int // MaxProposedExpiredOnlineAccounts is the maximum number of online accounts diff --git a/data/transactions/verify/verifiedTxnCache.go b/data/transactions/verify/verifiedTxnCache.go index b1220f9aa2..fa562c2a2f 100644 --- a/data/transactions/verify/verifiedTxnCache.go +++ b/data/transactions/verify/verifiedTxnCache.go @@ -49,12 +49,12 @@ var errTooManyPinnedEntries = &VerifiedTxnCacheError{errors.New("Too many pinned // errMissingPinnedEntry is being generated when we're trying to pin a transaction that does not appear in the cache var errMissingPinnedEntry = &VerifiedTxnCacheError{errors.New("Missing pinned entry")} -// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is desiged two have two separate "levels". On the +// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is designed to have two separate "levels". On the // bottom tier, the cache would be using a cyclic buffer, where old transactions would end up overridden by new ones. In order to support transactions // that goes into the transaction pool, we have a higher tier of pinned cache. Pinned transactions would not be cycled-away by new incoming transactions, // and would only get eliminated by updates to the transaction pool, which would inform the cache of updates to the pinned items. type VerifiedTransactionCache interface { - // Add adds a given transaction group and it's associated group context to the cache. If any of the transactions already appear + // Add adds a given transaction group and its associated group context to the cache. If any of the transactions already appear // in the cache, the new entry overrides the old one. Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) // AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts. diff --git a/data/txDupCache.go b/data/txDupCache.go index f05f89fbd8..b9981a4574 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -31,7 +31,7 @@ import ( ) // digestCache is a rotating cache of size N accepting crypto.Digest as a key -// and keeping up to 2*N elements in memory +// and keeping up to 2*maxSize elements in memory type digestCache struct { cur map[crypto.Digest]struct{} prev map[crypto.Digest]struct{} @@ -49,11 +49,11 @@ func makeDigestCache(size int) *digestCache { } // check if digest d is in a cache. -// locking semantic: write lock must be taken -func (c *digestCache) check(d *crypto.Digest) bool { - _, found := c.cur[*d] +// locking semantic: read lock must be taken +func (c *digestCache) check(d crypto.Digest) bool { + _, found := c.cur[d] if !found { - _, found = c.prev[*d] + _, found = c.prev[d] } return found } @@ -67,15 +67,15 @@ func (c *digestCache) swap() { // put adds digest d into a cache. // locking semantic: write lock must be taken -func (c *digestCache) put(d *crypto.Digest) { +func (c *digestCache) put(d crypto.Digest) { if len(c.cur) >= c.maxSize { c.swap() } - c.cur[*d] = struct{}{} + c.cur[d] = struct{}{} } // CheckAndPut adds digest d into a cache if not found -func (c *digestCache) CheckAndPut(d *crypto.Digest) bool { +func (c *digestCache) CheckAndPut(d crypto.Digest) bool { c.mu.Lock() defer c.mu.Unlock() if c.check(d) { @@ -94,11 +94,11 @@ func (c *digestCache) Len() int { } // Delete from the cache -func (c *digestCache) Delete(d *crypto.Digest) { +func (c *digestCache) Delete(d crypto.Digest) { c.mu.Lock() defer c.mu.Unlock() - delete(c.cur, *d) - delete(c.prev, *d) + delete(c.cur, d) + delete(c.prev, d) } // txSaltedCache is a digest cache with a rotating salt @@ -179,8 +179,8 @@ func (c *txSaltedCache) innerSwap(scheduled bool) { } // innerCheck returns true if exists, and the current salted hash if does not. -// locking semantic: write lock must be held -func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { +// locking semantic: READ lock must be held, cache is not mutated +func (c *txSaltedCache) innerCheck(msg []byte) (crypto.Digest, bool) { ptr := saltedPool.Get() defer saltedPool.Put(ptr) @@ -193,7 +193,7 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { _, found := c.cur[d] if found { - return nil, true + return crypto.Digest{}, true } toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...) @@ -201,14 +201,14 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) { pd := crypto.Digest(blake2b.Sum256(toBeHashed)) _, found = c.prev[pd] if found { - return nil, true + return crypto.Digest{}, true } - return &d, false + return d, false } // CheckAndPut adds msg into a cache if not found // returns a hashing key used for insertion if the message not found. -func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { +func (c *txSaltedCache) CheckAndPut(msg []byte) (crypto.Digest, bool) { c.mu.RLock() d, found := c.innerCheck(msg) salt := c.curSalt @@ -231,7 +231,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { } else { // Do another check to see if another copy of the transaction won the race to write it to the cache // Only check current to save a lookup since swaps are rare and no need to re-hash - if _, found := c.cur[*d]; found { + if _, found := c.cur[d]; found { return d, found } } @@ -246,16 +246,15 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) { toBeHashed = append(toBeHashed, c.curSalt[:]...) toBeHashed = toBeHashed[:len(msg)+len(c.curSalt)] - dn := crypto.Digest(blake2b.Sum256(toBeHashed)) - d = &dn + d = crypto.Digest(blake2b.Sum256(toBeHashed)) } - c.cur[*d] = struct{}{} + c.cur[d] = struct{}{} return d, false } // DeleteByKey from the cache by using a key used for insertion -func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) { +func (c *txSaltedCache) DeleteByKey(d crypto.Digest) { c.digestCache.Delete(d) } diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index bf10bade52..aee303fc54 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -45,10 +45,10 @@ func TestTxHandlerDigestCache(t *testing.T) { var ds [size]crypto.Digest for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) - exist := cache.CheckAndPut(&ds[i]) + exist := cache.CheckAndPut(ds[i]) require.False(t, exist) - exist = cache.check(&ds[i]) + exist = cache.check(ds[i]) require.True(t, exist) } @@ -56,7 +56,7 @@ func TestTxHandlerDigestCache(t *testing.T) { // try to re-add, ensure not added for i := 0; i < size; i++ { - exist := cache.CheckAndPut(&ds[i]) + exist := cache.CheckAndPut(ds[i]) require.True(t, exist) } @@ -66,10 +66,10 @@ func TestTxHandlerDigestCache(t *testing.T) { var ds2 [size]crypto.Digest for i := 0; i < size; i++ { crypto.RandBytes(ds2[i][:]) - exist := cache.CheckAndPut(&ds2[i]) + exist := cache.CheckAndPut(ds2[i]) require.False(t, exist) - exist = cache.check(&ds2[i]) + exist = cache.check(ds2[i]) require.True(t, exist) } @@ -77,34 +77,34 @@ func TestTxHandlerDigestCache(t *testing.T) { var d crypto.Digest crypto.RandBytes(d[:]) - exist := cache.CheckAndPut(&d) + exist := cache.CheckAndPut(d) require.False(t, exist) - exist = cache.check(&d) + exist = cache.check(d) require.True(t, exist) require.Equal(t, size+1, cache.Len()) // ensure hashes from the prev batch are still there for i := 0; i < size; i++ { - exist := cache.check(&ds2[i]) + exist := cache.check(ds2[i]) require.True(t, exist) } // ensure hashes from the first batch are gone for i := 0; i < size; i++ { - exist := cache.check(&ds[i]) + exist := cache.check(ds[i]) require.False(t, exist) } // check deletion works for i := 0; i < size; i++ { - cache.Delete(&ds[i]) - cache.Delete(&ds2[i]) + cache.Delete(ds[i]) + cache.Delete(ds2[i]) } require.Equal(t, 1, cache.Len()) - cache.Delete(&d) + cache.Delete(d) require.Equal(t, 0, cache.Len()) } @@ -125,7 +125,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { // add some unique random var ds [size][8]byte - var ks [size]*crypto.Digest + var ks [size]crypto.Digest var exist bool for i := 0; i < size; i++ { crypto.RandBytes([]byte(ds[i][:])) @@ -150,7 +150,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { // add some more and ensure capacity switch var ds2 [size][8]byte - var ks2 [size]*crypto.Digest + var ks2 [size]crypto.Digest for i := 0; i < size; i++ { crypto.RandBytes(ds2[i][:]) ks2[i], exist = cache.CheckAndPut(ds2[i][:]) @@ -309,7 +309,7 @@ func (p *digestCachePusher) push() { var d [crypto.DigestSize]byte crypto.RandBytes(d[:]) h := crypto.Digest(blake2b.Sum256(d[:])) // digestCache does not hashes so calculate hash here - p.c.CheckAndPut(&h) + p.c.CheckAndPut(h) } func (p *saltedCachePusher) push() { @@ -342,6 +342,7 @@ func BenchmarkDigestCaches(b *testing.B) { } for _, bench := range benchmarks { b.Run(fmt.Sprintf("%T/threads=%d", bench.maker, bench.numThreads), func(b *testing.B) { + b.ReportAllocs() benchmarkDigestCache(b, bench.maker, bench.numThreads) }) } diff --git a/data/txHandler.go b/data/txHandler.go index 65fd869d7e..edc233c0b3 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -104,8 +104,8 @@ const ( type txBacklogMsg struct { rawmsg *network.IncomingMessage // the raw message from the network unverifiedTxGroup []transactions.SignedTxn // the unverified ( and signed ) transaction group - rawmsgDataHash *crypto.Digest // hash (if any) of raw message data from the network - unverifiedTxGroupHash *crypto.Digest // hash (if any) of the unverifiedTxGroup + rawmsgDataHash crypto.Digest // hash (or IsZero) of raw message data from the network + unverifiedTxGroupHash crypto.Digest // hash (or IsZero) of the unverifiedTxGroup verificationErr error // The verification error generated by the verification function, if any. capguard *util.ErlCapacityGuard // the structure returned from the elastic rate limiter, to be released when dequeued } @@ -114,8 +114,6 @@ type txBacklogMsg struct { type TxHandler struct { txPool *pools.TransactionPool ledger *Ledger - genesisID string - genesisHash crypto.Digest txVerificationPool execpool.BacklogPool backlogQueue chan *txBacklogMsg backlogCongestionThreshold float64 @@ -144,8 +142,6 @@ type TxHandlerOpts struct { ExecutionPool execpool.BacklogPool Ledger *Ledger Net network.GossipNode - GenesisID string - GenesisHash crypto.Digest Config config.Local } @@ -173,8 +169,6 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { handler := &TxHandler{ txPool: opts.TxPool, - genesisID: opts.GenesisID, - genesisHash: opts.GenesisHash, ledger: opts.Ledger, txVerificationPool: opts.ExecutionPool, backlogQueue: make(chan *txBacklogMsg, txBacklogSize), @@ -220,14 +214,13 @@ func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { } // prepare the batch processor for pubsub synchronous verification - var err0 error - handler.batchVerifier, err0 = verify.MakeSigVerifier(handler.ledger, handler.ledger.VerifiedTransactionCache()) - if err0 != nil { - return nil, err0 + var err error + handler.batchVerifier, err = verify.MakeSigVerifier(handler.ledger, handler.ledger.VerifiedTransactionCache()) + if err != nil { + return nil, err } // prepare the transaction stream verifier - var err error txnElementProcessor, err := verify.MakeSigVerifyJobProcessor(handler.ledger, handler.ledger.VerifiedTransactionCache(), handler.postVerificationQueue, handler.streamVerifierDropped) if err != nil { @@ -545,19 +538,19 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender) } -func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *crypto.Digest) { - if handler.txCanonicalCache != nil && canonicalKey != nil { +func (handler *TxHandler) deleteFromCaches(msgKey crypto.Digest, canonicalKey crypto.Digest) { + if handler.txCanonicalCache != nil && !canonicalKey.IsZero() { handler.txCanonicalCache.Delete(canonicalKey) } - if handler.msgCache != nil && msgKey != nil { + if handler.msgCache != nil && !msgKey.IsZero() { handler.msgCache.DeleteByKey(msgKey) } } // dedupCanonical checks if the transaction group has been seen before after reencoding to canonical representation. // returns a key used for insertion if the group was not found. -func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key *crypto.Digest, reencoded []byte, isDup bool) { +func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.SignedTxn, consumed int) (key crypto.Digest, reencoded []byte, isDup bool) { // consider situations where someone want to censor transactions A // 1. Txn A is not part of a group => txn A with a valid signature is OK // Censorship attempts are: @@ -580,8 +573,8 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed // a single transaction => cache/dedup canonical txn with its signature enc := unverifiedTxGroup[0].MarshalMsg(nil) d = crypto.Hash(enc) - if handler.txCanonicalCache.CheckAndPut(&d) { - return nil, nil, true + if handler.txCanonicalCache.CheckAndPut(d) { + return crypto.Digest{}, nil, true } reencodedBuf = enc } else { @@ -594,23 +587,23 @@ func (handler *TxHandler) dedupCanonical(unverifiedTxGroup []transactions.Signed // reallocated, some assumption on size was wrong // log and skip logging.Base().Warnf("Decoded size %d does not match to encoded %d", consumed, len(encodeBuf)) - return nil, nil, false + return crypto.Digest{}, nil, false } d = crypto.Hash(encodeBuf) - if handler.txCanonicalCache.CheckAndPut(&d) { - return nil, nil, true + if handler.txCanonicalCache.CheckAndPut(d) { + return crypto.Digest{}, nil, true } reencodedBuf = encodeBuf } - return &d, reencodedBuf, false + return d, reencodedBuf, false } // incomingMsgDupCheck runs the duplicate check on a raw incoming message. // Returns: // - the key used for insertion if the message was not found in the cache // - a boolean indicating if the message was a duplicate -func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool) { - var msgKey *crypto.Digest +func (handler *TxHandler) incomingMsgDupCheck(data []byte) (crypto.Digest, bool) { + var msgKey crypto.Digest var isDup bool if handler.msgCache != nil { // check for duplicate messages @@ -701,14 +694,14 @@ func decodeMsg(data []byte) (unverifiedTxGroup []transactions.SignedTxn, consume // incomingTxGroupCanonicalDedup checks if the incoming transaction group has been seen before after reencoding to canonical representation. // It also return canonical representation of the transaction group allowing the caller to compare it with the input. -func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int) (*crypto.Digest, []byte, bool) { - var canonicalKey *crypto.Digest +func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []transactions.SignedTxn, encodedExpectedSize int) (crypto.Digest, []byte, bool) { + var canonicalKey crypto.Digest var reencoded []byte if handler.txCanonicalCache != nil { var isDup bool if canonicalKey, reencoded, isDup = handler.dedupCanonical(unverifiedTxGroup, encodedExpectedSize); isDup { transactionMessagesDupCanonical.Inc(nil) - return nil, nil, true + return crypto.Digest{}, nil, true } } return canonicalKey, reencoded, false @@ -891,11 +884,9 @@ var errBackLogFullLocal = errors.New("backlog full") func (handler *TxHandler) LocalTransaction(txgroup []transactions.SignedTxn) error { select { case handler.backlogQueue <- &txBacklogMsg{ - rawmsg: &network.IncomingMessage{}, - unverifiedTxGroup: txgroup, - rawmsgDataHash: nil, - unverifiedTxGroupHash: nil, - capguard: nil, + rawmsg: &network.IncomingMessage{}, + unverifiedTxGroup: txgroup, + capguard: nil, }: default: transactionMessagesDroppedFromBacklog.Inc(nil) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 67e82468fa..90452a8258 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -825,7 +825,7 @@ func makeTestTxHandler(dl *Ledger, cfg config.Local) (*TxHandler, error) { tp := pools.MakeTransactionPool(dl.Ledger, cfg, logging.Base(), nil) backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) opts := TxHandlerOpts{ - tp, backlogPool, dl, &mocks.MockNetwork{}, "", crypto.Digest{}, cfg, + tp, backlogPool, dl, &mocks.MockNetwork{}, cfg, } return MakeTxHandler(opts) } @@ -2349,19 +2349,19 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall func TestMakeTxHandlerErrors(t *testing.T) { partitiontest.PartitionTest(t) opts := TxHandlerOpts{ - nil, nil, nil, &mocks.MockNetwork{}, "", crypto.Digest{}, config.Local{}, + nil, nil, nil, &mocks.MockNetwork{}, config.Local{}, } _, err := MakeTxHandler(opts) require.Error(t, err, ErrInvalidTxPool) opts = TxHandlerOpts{ - &pools.TransactionPool{}, nil, nil, &mocks.MockNetwork{}, "", crypto.Digest{}, config.Local{}, + &pools.TransactionPool{}, nil, nil, &mocks.MockNetwork{}, config.Local{}, } _, err = MakeTxHandler(opts) require.Error(t, err, ErrInvalidLedger) // it is not possible to test MakeStreamVerifier returning an error, because it is not possible to - // get the leger return an error for returining the header of its latest round + // get the ledger to return an error for returining the header of its latest round } // TestTxHandlerRestartWithBacklogAndTxPool starts txHandler, sends transactions, diff --git a/node/node.go b/node/node.go index 38dea020a1..15b2bea5e5 100644 --- a/node/node.go +++ b/node/node.go @@ -262,8 +262,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd ExecutionPool: node.lowPriorityCryptoVerificationPool, Ledger: node.ledger, Net: node.net, - GenesisID: node.genesisID, - GenesisHash: node.genesisHash, Config: cfg, } node.txHandler, err = data.MakeTxHandler(txHandlerOpts)