Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,8 @@ var MaxBytesKeyValueLen int
// of the consensus protocols. used for decoding purposes.
var MaxExtraAppProgramLen int

// MaxAvailableAppProgramLen is the largest supported app program size include the extra pages
// supported supported by any of the consensus protocols. used for decoding purposes.
// MaxAvailableAppProgramLen is the largest supported app program size including the extra
// pages supported by any of the consensus protocols. used for decoding purposes.
var MaxAvailableAppProgramLen int

// MaxProposedExpiredOnlineAccounts is the maximum number of online accounts
Expand Down
4 changes: 2 additions & 2 deletions data/transactions/verify/verifiedTxnCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ var errTooManyPinnedEntries = &VerifiedTxnCacheError{errors.New("Too many pinned
// errMissingPinnedEntry is being generated when we're trying to pin a transaction that does not appear in the cache
var errMissingPinnedEntry = &VerifiedTxnCacheError{errors.New("Missing pinned entry")}

// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is desiged two have two separate "levels". On the
// VerifiedTransactionCache provides a cached store of recently verified transactions. The cache is designed to have two separate "levels". On the
// bottom tier, the cache would be using a cyclic buffer, where old transactions would end up overridden by new ones. In order to support transactions
// that goes into the transaction pool, we have a higher tier of pinned cache. Pinned transactions would not be cycled-away by new incoming transactions,
// and would only get eliminated by updates to the transaction pool, which would inform the cache of updates to the pinned items.
type VerifiedTransactionCache interface {
// Add adds a given transaction group and it's associated group context to the cache. If any of the transactions already appear
// Add adds a given transaction group and its associated group context to the cache. If any of the transactions already appear
// in the cache, the new entry overrides the old one.
Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext)
// AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts.
Expand Down
43 changes: 21 additions & 22 deletions data/txDupCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

// digestCache is a rotating cache of size N accepting crypto.Digest as a key
// and keeping up to 2*N elements in memory
// and keeping up to 2*maxSize elements in memory
type digestCache struct {
cur map[crypto.Digest]struct{}
prev map[crypto.Digest]struct{}
Expand All @@ -49,11 +49,11 @@ func makeDigestCache(size int) *digestCache {
}

// check if digest d is in a cache.
// locking semantic: write lock must be taken
func (c *digestCache) check(d *crypto.Digest) bool {
_, found := c.cur[*d]
// locking semantic: read lock must be taken
func (c *digestCache) check(d crypto.Digest) bool {
_, found := c.cur[d]
if !found {
_, found = c.prev[*d]
_, found = c.prev[d]
}
return found
}
Expand All @@ -67,15 +67,15 @@ func (c *digestCache) swap() {

// put adds digest d into a cache.
// locking semantic: write lock must be taken
func (c *digestCache) put(d *crypto.Digest) {
func (c *digestCache) put(d crypto.Digest) {
if len(c.cur) >= c.maxSize {
c.swap()
}
c.cur[*d] = struct{}{}
c.cur[d] = struct{}{}
}

// CheckAndPut adds digest d into a cache if not found
func (c *digestCache) CheckAndPut(d *crypto.Digest) bool {
func (c *digestCache) CheckAndPut(d crypto.Digest) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.check(d) {
Expand All @@ -94,11 +94,11 @@ func (c *digestCache) Len() int {
}

// Delete from the cache
func (c *digestCache) Delete(d *crypto.Digest) {
func (c *digestCache) Delete(d crypto.Digest) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cur, *d)
delete(c.prev, *d)
delete(c.cur, d)
delete(c.prev, d)
}

// txSaltedCache is a digest cache with a rotating salt
Expand Down Expand Up @@ -179,8 +179,8 @@ func (c *txSaltedCache) innerSwap(scheduled bool) {
}

// innerCheck returns true if exists, and the current salted hash if does not.
// locking semantic: write lock must be held
func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) {
// locking semantic: READ lock must be held, cache is not mutated
func (c *txSaltedCache) innerCheck(msg []byte) (crypto.Digest, bool) {
ptr := saltedPool.Get()
defer saltedPool.Put(ptr)

Expand All @@ -193,22 +193,22 @@ func (c *txSaltedCache) innerCheck(msg []byte) (*crypto.Digest, bool) {

_, found := c.cur[d]
if found {
return nil, true
return crypto.Digest{}, true
}

toBeHashed = append(toBeHashed[:len(msg)], c.prevSalt[:]...)
toBeHashed = toBeHashed[:len(msg)+len(c.prevSalt)]
pd := crypto.Digest(blake2b.Sum256(toBeHashed))
_, found = c.prev[pd]
if found {
return nil, true
return crypto.Digest{}, true
}
return &d, false
return d, false
}

// CheckAndPut adds msg into a cache if not found
// returns a hashing key used for insertion if the message not found.
func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
func (c *txSaltedCache) CheckAndPut(msg []byte) (crypto.Digest, bool) {
c.mu.RLock()
d, found := c.innerCheck(msg)
salt := c.curSalt
Expand All @@ -231,7 +231,7 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
} else {
// Do another check to see if another copy of the transaction won the race to write it to the cache
// Only check current to save a lookup since swaps are rare and no need to re-hash
if _, found := c.cur[*d]; found {
if _, found := c.cur[d]; found {
return d, found
}
}
Expand All @@ -246,16 +246,15 @@ func (c *txSaltedCache) CheckAndPut(msg []byte) (*crypto.Digest, bool) {
toBeHashed = append(toBeHashed, c.curSalt[:]...)
toBeHashed = toBeHashed[:len(msg)+len(c.curSalt)]

dn := crypto.Digest(blake2b.Sum256(toBeHashed))
d = &dn
d = crypto.Digest(blake2b.Sum256(toBeHashed))
}

c.cur[*d] = struct{}{}
c.cur[d] = struct{}{}
return d, false
}

// DeleteByKey from the cache by using a key used for insertion
func (c *txSaltedCache) DeleteByKey(d *crypto.Digest) {
func (c *txSaltedCache) DeleteByKey(d crypto.Digest) {
c.digestCache.Delete(d)
}

Expand Down
31 changes: 16 additions & 15 deletions data/txDupCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ func TestTxHandlerDigestCache(t *testing.T) {
var ds [size]crypto.Digest
for i := 0; i < size; i++ {
crypto.RandBytes([]byte(ds[i][:]))
exist := cache.CheckAndPut(&ds[i])
exist := cache.CheckAndPut(ds[i])
require.False(t, exist)

exist = cache.check(&ds[i])
exist = cache.check(ds[i])
require.True(t, exist)
}

require.Equal(t, size, cache.Len())

// try to re-add, ensure not added
for i := 0; i < size; i++ {
exist := cache.CheckAndPut(&ds[i])
exist := cache.CheckAndPut(ds[i])
require.True(t, exist)
}

Expand All @@ -66,45 +66,45 @@ func TestTxHandlerDigestCache(t *testing.T) {
var ds2 [size]crypto.Digest
for i := 0; i < size; i++ {
crypto.RandBytes(ds2[i][:])
exist := cache.CheckAndPut(&ds2[i])
exist := cache.CheckAndPut(ds2[i])
require.False(t, exist)

exist = cache.check(&ds2[i])
exist = cache.check(ds2[i])
require.True(t, exist)
}

require.Equal(t, 2*size, cache.Len())

var d crypto.Digest
crypto.RandBytes(d[:])
exist := cache.CheckAndPut(&d)
exist := cache.CheckAndPut(d)
require.False(t, exist)
exist = cache.check(&d)
exist = cache.check(d)
require.True(t, exist)

require.Equal(t, size+1, cache.Len())

// ensure hashes from the prev batch are still there
for i := 0; i < size; i++ {
exist := cache.check(&ds2[i])
exist := cache.check(ds2[i])
require.True(t, exist)
}

// ensure hashes from the first batch are gone
for i := 0; i < size; i++ {
exist := cache.check(&ds[i])
exist := cache.check(ds[i])
require.False(t, exist)
}

// check deletion works
for i := 0; i < size; i++ {
cache.Delete(&ds[i])
cache.Delete(&ds2[i])
cache.Delete(ds[i])
cache.Delete(ds2[i])
}

require.Equal(t, 1, cache.Len())

cache.Delete(&d)
cache.Delete(d)
require.Equal(t, 0, cache.Len())
}

Expand All @@ -125,7 +125,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) {

// add some unique random
var ds [size][8]byte
var ks [size]*crypto.Digest
var ks [size]crypto.Digest
var exist bool
for i := 0; i < size; i++ {
crypto.RandBytes([]byte(ds[i][:]))
Expand All @@ -150,7 +150,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) {

// add some more and ensure capacity switch
var ds2 [size][8]byte
var ks2 [size]*crypto.Digest
var ks2 [size]crypto.Digest
for i := 0; i < size; i++ {
crypto.RandBytes(ds2[i][:])
ks2[i], exist = cache.CheckAndPut(ds2[i][:])
Expand Down Expand Up @@ -309,7 +309,7 @@ func (p *digestCachePusher) push() {
var d [crypto.DigestSize]byte
crypto.RandBytes(d[:])
h := crypto.Digest(blake2b.Sum256(d[:])) // digestCache does not hashes so calculate hash here
p.c.CheckAndPut(&h)
p.c.CheckAndPut(h)
}

func (p *saltedCachePusher) push() {
Expand Down Expand Up @@ -342,6 +342,7 @@ func BenchmarkDigestCaches(b *testing.B) {
}
for _, bench := range benchmarks {
b.Run(fmt.Sprintf("%T/threads=%d", bench.maker, bench.numThreads), func(b *testing.B) {
b.ReportAllocs()
benchmarkDigestCache(b, bench.maker, bench.numThreads)
})
}
Expand Down
Loading