Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[utils/bloom] Optionally Update Bloom Filter Size on Reset #2591

Merged
merged 41 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8d2b459
Optimize bloom filter
StephenButtolph Jan 4, 2024
1a845f0
remove murmur
StephenButtolph Jan 4, 2024
d94b846
Update serialization logic and invert probability calculations
StephenButtolph Jan 5, 2024
de764ef
special cases
StephenButtolph Jan 5, 2024
83cbe38
test
StephenButtolph Jan 5, 2024
1d0b0a3
add fuzzing
StephenButtolph Jan 5, 2024
e1b9490
Prevent undefined behavior
StephenButtolph Jan 5, 2024
2ef237e
nits
StephenButtolph Jan 5, 2024
ef2cb2b
nit
StephenButtolph Jan 5, 2024
97e4958
nit
StephenButtolph Jan 5, 2024
5ab52b0
cleanup edge case handling
StephenButtolph Jan 5, 2024
7104b09
add comment
StephenButtolph Jan 5, 2024
7ac29af
nit
StephenButtolph Jan 5, 2024
6eaf8d9
Remove test flaking on platform specific results from math.Log
StephenButtolph Jan 5, 2024
5369c92
Merge branch 'dev' into optimize-bloom-filter
StephenButtolph Jan 5, 2024
5de074f
dynamically set new expected elements
patrick-ogrady Jan 5, 2024
2f5c983
update bloom elements
patrick-ogrady Jan 5, 2024
9293aee
auto-resize bloom
patrick-ogrady Jan 5, 2024
5989c75
remove implicit multiplication
patrick-ogrady Jan 5, 2024
bac9851
add min target
patrick-ogrady Jan 5, 2024
2dc23a3
remove unnecessary type arg
patrick-ogrady Jan 5, 2024
5cdb473
revert err removal
patrick-ogrady Jan 5, 2024
24e3aec
tests passing
patrick-ogrady Jan 5, 2024
067ee2e
nit to false positive
patrick-ogrady Jan 5, 2024
cbbe5c6
update platform mempool
patrick-ogrady Jan 5, 2024
da1a784
update avm
patrick-ogrady Jan 5, 2024
00f7fa1
update mocks
patrick-ogrady Jan 5, 2024
24de51f
remove safemath alias
patrick-ogrady Jan 5, 2024
129f938
fix avm tests
patrick-ogrady Jan 5, 2024
41587de
fix platformvm tests
patrick-ogrady Jan 5, 2024
314229f
update coreth
patrick-ogrady Jan 5, 2024
790b8a5
add churn multiplier
patrick-ogrady Jan 6, 2024
a774b9a
coreth latest
patrick-ogrady Jan 6, 2024
d3a56c0
Merge branch 'dev' into dynamic-reset-size
patrick-ogrady Jan 10, 2024
1ea9d10
leq
patrick-ogrady Jan 10, 2024
3ca23d2
remove marshal error
patrick-ogrady Jan 10, 2024
a905aae
remove GetFilter error
patrick-ogrady Jan 10, 2024
7aea9f1
update coreth
patrick-ogrady Jan 10, 2024
0971979
actually check reset
patrick-ogrady Jan 10, 2024
731ef28
remove unused Parameters() function
patrick-ogrady Jan 10, 2024
0afac98
update coreth
patrick-ogrady Jan 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.10-rc.4
github.com/ava-labs/coreth v0.12.9-rc.9.0.20240110070856-c8809cc3c436
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down Expand Up @@ -107,7 +107,6 @@ require (
github.com/hashicorp/go-bexpr v0.1.10 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e // indirect
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.10-rc.4 h1:+Ll3cpi3tZbw37lTa+1a/VnPa7xZktpbAFiuKtDKnIE=
github.com/ava-labs/coreth v0.12.10-rc.4/go.mod h1:8pt5LW6MP/luIdhQA+gvs8LobKE8tP/5APXUiFbfb2c=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20240110070856-c8809cc3c436 h1:s2LATNf3948KEIkgpUONktGKUhaU3rb6/o2b4KA1bcQ=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20240110070856-c8809cc3c436/go.mod h1:fHAYNaRtIA+rkrHzy8i2HhXyj6dqC64C5ZfiXymZlQM=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down Expand Up @@ -353,8 +353,6 @@ github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d h1:dg1dEPuW
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e h1:pIYdhNkDh+YENVNi3gto8n9hAmRxKxoar0iE6BLucjw=
github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e/go.mod h1:j9cQbcqHQujT0oKJ38PylVfqohClLr3CvDC+Qcg+lhU=
github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao=
github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA=
github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c h1:DZfsyhDK1hnSS5lH8l+JggqzEleHteTYfutAiVlSUM8=
Expand Down
64 changes: 40 additions & 24 deletions network/p2p/gossip/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,48 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/math"
)

// NewBloomFilter returns a new instance of a bloom filter with at most
// [maxExpectedElements] elements anticipated at any moment, and a false
// positive probability of [falsePositiveProbability].
// NewBloomFilter returns a new instance of a bloom filter with at least [minTargetElements] elements
// anticipated at any moment, and a false positive probability of [targetFalsePositiveProbability]. If the
// false positive probability exceeds [resetFalsePositiveProbability], the bloom filter will be reset.
//
// Invariant: The returned bloom filter is not safe to reset concurrently with
// other operations. However, it is otherwise safe to access concurrently.
func NewBloomFilter(
maxExpectedElements int,
falsePositiveProbability float64,
minTargetElements int,
targetFalsePositiveProbability,
resetFalsePositiveProbability float64,
) (*BloomFilter, error) {
bloom, err := bloom.New(bloom.OptimalParameters(
maxExpectedElements,
falsePositiveProbability,
))
numHashes, numEntries := bloom.OptimalParameters(
patrick-ogrady marked this conversation as resolved.
Show resolved Hide resolved
minTargetElements,
targetFalsePositiveProbability,
)
b, err := bloom.New(numHashes, numEntries)
if err != nil {
return nil, err
}

salt, err := randomSalt()
return &BloomFilter{
bloom: bloom,
salt: salt,
minTargetElements: minTargetElements,
targetFalsePositiveProbability: targetFalsePositiveProbability,
resetFalsePositiveProbability: resetFalsePositiveProbability,

maxCount: bloom.EstimateCount(numHashes, numEntries, resetFalsePositiveProbability),
bloom: b,
salt: salt,
}, err
}

type BloomFilter struct {
bloom *bloom.Filter
minTargetElements int
targetFalsePositiveProbability float64
resetFalsePositiveProbability float64

maxCount int
bloom *bloom.Filter
// salt is provided to eventually unblock collisions in Bloom. It's possible
// that conflicting Gossipable items collide in the bloom filter, so a salt
// is generated to eventually resolve collisions.
Expand All @@ -53,29 +66,32 @@ func (b *BloomFilter) Has(gossipable Gossipable) bool {
return bloom.Contains(b.bloom, h[:], b.salt[:])
}

// TODO: Remove error from the return
func (b *BloomFilter) Marshal() ([]byte, []byte, error) {
func (b *BloomFilter) Marshal() ([]byte, []byte) {
bloomBytes := b.bloom.Marshal()
// salt must be copied here to ensure the bytes aren't overwritten if salt
// is later modified.
salt := b.salt
return bloomBytes, salt[:], nil
return bloomBytes, salt[:]
}

// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a target false
// positive probability. Returns true if the bloom filter was reset.
// ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability].
//
// If [targetElements] exceeds [minTargetElements], the size of the bloom filter will grow to maintain
// the same [targetFalsePositiveProbability].
//
// Returns true if the bloom filter was reset.
func ResetBloomFilterIfNeeded(
bloomFilter *BloomFilter,
falsePositiveProbability float64,
targetElements int,
) (bool, error) {
numHashes, numEntries := bloomFilter.bloom.Parameters()
// TODO: Precalculate maxCount, as it is independent of the current state
// of the bloom filter.
maxCount := bloom.EstimateCount(numHashes, numEntries, falsePositiveProbability)
if bloomFilter.bloom.Count() < maxCount {
if bloomFilter.bloom.Count() <= bloomFilter.maxCount {
return false, nil
}

numHashes, numEntries := bloom.OptimalParameters(
ceyonur marked this conversation as resolved.
Show resolved Hide resolved
math.Max(bloomFilter.minTargetElements, targetElements),
bloomFilter.targetFalsePositiveProbability,
)
newBloom, err := bloom.New(numHashes, numEntries)
if err != nil {
return false, err
Expand All @@ -84,7 +100,7 @@ func ResetBloomFilterIfNeeded(
if err != nil {
return false, err
}

bloomFilter.maxCount = bloom.EstimateCount(numHashes, numEntries, bloomFilter.resetFalsePositiveProbability)
patrick-ogrady marked this conversation as resolved.
Show resolved Hide resolved
bloomFilter.bloom = newBloom
bloomFilter.salt = salt
return true, nil
Expand Down
55 changes: 40 additions & 15 deletions network/p2p/gossip/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,83 @@ import (

func TestBloomFilterRefresh(t *testing.T) {
tests := []struct {
name string
falsePositiveProbability float64
add []*testTx
expected []*testTx
name string
minTargetElements int
targetFalsePositiveProbability float64
resetFalsePositiveProbability float64
reset bool
add []*testTx
expected []*testTx
}{
{
name: "no refresh",
falsePositiveProbability: 1,
name: "no refresh",
minTargetElements: 1,
targetFalsePositiveProbability: 0.01,
resetFalsePositiveProbability: 1,
reset: false, // maxCount = 9223372036854775807
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
{id: ids.ID{2}},
},
expected: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
{id: ids.ID{2}},
},
},
{
name: "refresh",
falsePositiveProbability: 0.1,
name: "refresh",
minTargetElements: 1,
targetFalsePositiveProbability: 0.01,
resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1
reset: true,
add: []*testTx{
{id: ids.ID{0}},
{id: ids.ID{1}},
{id: ids.ID{2}},
},
expected: []*testTx{
{id: ids.ID{1}},
{id: ids.ID{2}},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
b, err := bloom.New(1, 10)
numHashes, numEntries := bloom.OptimalParameters(
tt.minTargetElements,
tt.targetFalsePositiveProbability,
)
b, err := bloom.New(numHashes, numEntries)
require.NoError(err)
bloom := BloomFilter{
bloom: b,
bloom: b,
maxCount: bloom.EstimateCount(numHashes, numEntries, tt.resetFalsePositiveProbability),
minTargetElements: tt.minTargetElements,
targetFalsePositiveProbability: tt.targetFalsePositiveProbability,
resetFalsePositiveProbability: tt.resetFalsePositiveProbability,
}

var didReset bool
for _, item := range tt.add {
bloomBytes, saltBytes, err := bloom.Marshal()
require.NoError(err)

bloomBytes, saltBytes := bloom.Marshal()
initialBloomBytes := slices.Clone(bloomBytes)
initialSaltBytes := slices.Clone(saltBytes)

_, err = ResetBloomFilterIfNeeded(&bloom, tt.falsePositiveProbability)
reset, err := ResetBloomFilterIfNeeded(&bloom, len(tt.add))
require.NoError(err)
if reset {
didReset = reset
}
bloom.Add(item)

require.Equal(initialBloomBytes, bloomBytes)
require.Equal(initialSaltBytes, saltBytes)
}

require.Equal(tt.reset, didReset)
for _, expected := range tt.expected {
require.True(bloom.Has(expected))
}
Expand Down
6 changes: 1 addition & 5 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ type PullGossiper[T Gossipable] struct {
}

func (p *PullGossiper[_]) Gossip(ctx context.Context) error {
bloom, salt, err := p.set.GetFilter()
if err != nil {
return err
}

bloom, salt := p.set.GetFilter()
request := &sdk.PullGossipRequest{
Filter: bloom,
Salt: salt,
Expand Down
6 changes: 3 additions & 3 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TestGossiperGossip(t *testing.T) {
responseNetwork, err := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
require.NoError(err)

responseBloom, err := NewBloomFilter(1000, 0.01)
responseBloom, err := NewBloomFilter(1000, 0.01, 0.05)
require.NoError(err)
responseSet := &testSet{
txs: make(map[ids.ID]*testTx),
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(err)
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))

bloom, err := NewBloomFilter(1000, 0.01)
bloom, err := NewBloomFilter(1000, 0.01, 0.05)
require.NoError(err)
requestSet := &testSet{
txs: make(map[ids.ID]*testTx),
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestPushGossipE2E(t *testing.T) {
knownTx := &testTx{id: ids.GenerateTestID()}

log := logging.NoLog{}
bloom, err := NewBloomFilter(100, 0.01)
bloom, err := NewBloomFilter(100, 0.01, 0.05)
require.NoError(err)
set := &testSet{
txs: make(map[ids.ID]*testTx),
Expand Down
2 changes: 1 addition & 1 deletion network/p2p/gossip/gossipable.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ type Set[T Gossipable] interface {
Iterate(f func(gossipable T) bool)
// GetFilter returns the byte representation of bloom filter and its
// corresponding salt.
GetFilter() (bloom []byte, salt []byte, err error)
GetFilter() (bloom []byte, salt []byte)
}
2 changes: 1 addition & 1 deletion network/p2p/gossip/test_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ func (t *testSet) Iterate(f func(gossipable *testTx) bool) {
}
}

func (t *testSet) GetFilter() ([]byte, []byte, error) {
func (t *testSet) GetFilter() ([]byte, []byte) {
return t.bloom.Marshal()
}
34 changes: 18 additions & 16 deletions vms/avm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ var (
_ gossip.Marshaller[*txs.Tx] = (*txParser)(nil)
)

// bloomChurnMultiplier is the number used to multiply the size of the mempool
// to determine how large of a bloom filter to create.
const bloomChurnMultiplier = 3
ceyonur marked this conversation as resolved.
Show resolved Hide resolved

// txGossipHandler is the handler called when serving gossip messages
type txGossipHandler struct {
p2p.NoOpHandler
Expand Down Expand Up @@ -64,27 +68,25 @@ func newGossipMempool(
log logging.Logger,
txVerifier TxVerifier,
parser txs.Parser,
maxExpectedElements int,
falsePositiveProbability,
maxFalsePositiveProbability float64,
minTargetElements int,
targetFalsePositiveProbability,
resetFalsePositiveProbability float64,
) (*gossipMempool, error) {
bloom, err := gossip.NewBloomFilter(maxExpectedElements, falsePositiveProbability)
bloom, err := gossip.NewBloomFilter(minTargetElements, targetFalsePositiveProbability, resetFalsePositiveProbability)
return &gossipMempool{
Mempool: mempool,
log: log,
txVerifier: txVerifier,
parser: parser,
maxFalsePositiveProbability: maxFalsePositiveProbability,
bloom: bloom,
Mempool: mempool,
log: log,
txVerifier: txVerifier,
parser: parser,
bloom: bloom,
}, err
}

type gossipMempool struct {
mempool.Mempool
log logging.Logger
txVerifier TxVerifier
parser txs.Parser
maxFalsePositiveProbability float64
log logging.Logger
txVerifier TxVerifier
parser txs.Parser

lock sync.RWMutex
bloom *gossip.BloomFilter
Expand Down Expand Up @@ -127,7 +129,7 @@ func (g *gossipMempool) AddVerified(tx *txs.Tx) error {
defer g.lock.Unlock()

g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.maxFalsePositiveProbability)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if err != nil {
return err
}
Expand All @@ -148,7 +150,7 @@ func (g *gossipMempool) Iterate(f func(*txs.Tx) bool) {
g.Mempool.Iterate(f)
}

func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte, err error) {
func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) {
g.lock.RLock()
defer g.lock.RUnlock()

Expand Down
3 changes: 3 additions & 0 deletions vms/avm/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestNetworkAppGossip(t *testing.T) {
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil)
mempool.EXPECT().Add(gomock.Any()).Return(nil)
mempool.EXPECT().Len().Return(0)
mempool.EXPECT().RequestBuildBlock()
return mempool
},
Expand Down Expand Up @@ -303,6 +304,7 @@ func TestNetworkIssueTx(t *testing.T) {
mempool.EXPECT().Get(gomock.Any()).Return(nil, false)
mempool.EXPECT().GetDropReason(gomock.Any()).Return(nil)
mempool.EXPECT().Add(gomock.Any()).Return(nil)
mempool.EXPECT().Len().Return(0)
mempool.EXPECT().RequestBuildBlock()
return mempool
},
Expand Down Expand Up @@ -398,6 +400,7 @@ func TestNetworkIssueVerifiedTx(t *testing.T) {
mempoolFunc: func(ctrl *gomock.Controller) mempool.Mempool {
mempool := mempool.NewMockMempool(ctrl)
mempool.EXPECT().Add(gomock.Any()).Return(nil)
mempool.EXPECT().Len().Return(0)
mempool.EXPECT().RequestBuildBlock()
return mempool
},
Expand Down
Loading
Loading