Skip to content

Commit

Permalink
Introduce DecodeFrom and EncodeReader
Browse files Browse the repository at this point in the history
The buckets byte slice is the biggest part of the the memory used by the filter,
and might be several of GBs.

Common usage of a filter is in an environment with limited RAM size
based on the filter size, load it to memory on startup and dump it to
disk on teardown.
Currently the Encode and Decode methods duplicates the byte slice,
which makes the memory usage at the loading and dumping time to be (at
least) twice the filter size.

This commit introduces a new method for dumping the filter using a reader
of the internal byte slice, and a method for loading the filter based on
already fetched encoded bytes (from disk, network) and use them
internaly instead of making a copy.

+ formatting.
  • Loading branch information
EladGabay committed Oct 9, 2021
1 parent 8548deb commit a2a1889
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 137 deletions.
100 changes: 62 additions & 38 deletions cuckoofilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@
package cuckoo

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"

"github.com/dgryski/go-metro"
)

// maximum number of cuckoo kicks before claiming failure
const kMaxCuckooCount uint = 500

const (
//TableTypeSingle normal single table
// TableTypeSingle normal single table
TableTypeSingle = 0
//TableTypePacked packed table, use semi-sort to save 1 bit per item
// TableTypePacked packed table, use semi-sort to save 1 bit per item
TableTypePacked = 1
)

type table interface {
Init(tagsPerBucket, bitsPerTag, num uint)
Init(tagsPerBucket, bitsPerTag, num uint, initialBucketsHint []byte) error
NumBuckets() uint
FindTagInBuckets(i1, i2 uint, tag uint32) bool
DeleteTagFromBucket(i uint, tag uint32) bool
Expand All @@ -32,7 +35,8 @@ type table interface {
SizeInBytes() uint
Info() string
BitsPerItem() uint
Encode() []byte
EncodedSizeInBytes() uint
Reader() io.Reader
Decode([]byte) error
Reset()
}
Expand All @@ -52,7 +56,9 @@ type victimCache struct {
used bool
}

//Filter cuckoo filter type struct
const filterMetadataSize = 3*bytesPerUint32 + 1

// Filter cuckoo filter type struct
type Filter struct {
victim victimCache
numItems uint
Expand All @@ -75,7 +81,7 @@ func NewFilter(tagsPerBucket, bitsPerItem, maxNumKeys, tableType uint) *Filter {
numBuckets = 1
}
table := getTable(tableType).(table)
table.Init(tagsPerBucket, bitsPerItem, numBuckets)
_ = table.Init(tagsPerBucket, bitsPerItem, numBuckets, nil)
return &Filter{
table: table,
}
Expand All @@ -102,7 +108,7 @@ func (f *Filter) altIndex(index uint, tag uint32) uint {
return f.indexHash(uint32(index) ^ (tag * 0x5bd1e995))
}

//Size return num of items that filter store
// Size return num of items that filter store
func (f *Filter) Size() uint {
var c uint
if f.victim.used {
Expand All @@ -111,22 +117,27 @@ func (f *Filter) Size() uint {
return f.numItems + c
}

//LoadFactor return current filter's loadFactor
// LoadFactor return current filter's loadFactor
func (f *Filter) LoadFactor() float64 {
return 1.0 * float64(f.Size()) / float64(f.table.SizeInTags())
}

//SizeInBytes return bytes occupancy of filter's table
// SizeInBytes return bytes occupancy of filter's table
func (f *Filter) SizeInBytes() uint {
return f.table.SizeInBytes()
}

//BitsPerItem return bits occupancy per item of filter's table
// EncodedSizeInBytes return bytes occupancy of filter including metadata
func (f *Filter) EncodedSizeInBytes() uint {
return f.table.EncodedSizeInBytes() + filterMetadataSize
}

// BitsPerItem return bits occupancy per item of filter's table
func (f *Filter) BitsPerItem() float64 {
return 8.0 * float64(f.table.SizeInBytes()) / float64(f.Size())
}

//Add add an item into filter, return false when filter is full
// Add add an item into filter, return false when filter is full
func (f *Filter) Add(item []byte) bool {
if f.victim.used {
return false
Expand All @@ -135,7 +146,7 @@ func (f *Filter) Add(item []byte) bool {
return f.addImpl(i, tag)
}

//AddUnique add an item into filter, return false when filter already contains it or filter is full
// AddUnique add an item into filter, return false when filter already contains it or filter is full
func (f *Filter) AddUnique(item []byte) bool {
if f.Contain(item) {
return false
Expand Down Expand Up @@ -169,7 +180,7 @@ func (f *Filter) addImpl(i uint, tag uint32) bool {
return true
}

//Contain return if filter contains an item
// Contain return if filter contains an item
func (f *Filter) Contain(key []byte) bool {
i1, tag := f.generateIndexTagHash(key)
i2 := f.altIndex(i1, tag)
Expand All @@ -182,7 +193,7 @@ func (f *Filter) Contain(key []byte) bool {
return false
}

//Delete delete item from filter, return false when item not exist
// Delete delete item from filter, return false when item not exist
func (f *Filter) Delete(key []byte) bool {
i1, tag := f.generateIndexTagHash(key)
i2 := f.altIndex(i1, tag)
Expand Down Expand Up @@ -238,7 +249,7 @@ func (f *Filter) FalsePositiveRate() float64 {
return float64(fp) / float64(rounds)
}

//Info return filter's detail info
// Info return filter's detail info
func (f *Filter) Info() string {
return fmt.Sprintf("CuckooFilter Status:\n"+
"\t\t%v\n"+
Expand All @@ -250,37 +261,50 @@ func (f *Filter) Info() string {
}

// Encode returns a byte slice representing a Cuckoo filter
func (f *Filter) Encode() []byte {
var b [3][bytesPerUint32]byte
binary.LittleEndian.PutUint32(b[0][:], uint32(f.numItems))
binary.LittleEndian.PutUint32(b[1][:], uint32(f.victim.index))
binary.LittleEndian.PutUint32(b[2][:], f.victim.tag)

ret := append(b[0][:], b[1][:]...)
ret = append(ret, b[2][:]...)
func (f *Filter) Encode() ([]byte, error) {
buf := make([]byte, f.EncodedSizeInBytes())
if _, err := io.ReadFull(f.EncodeReader(), buf); err != nil {
return nil, err
}
return buf, nil
}

// EncodeReader returns a reader representing a Cuckoo filter
func (f *Filter) EncodeReader() io.Reader {
var metadata [filterMetadataSize]byte

for i, n := range []uint32{uint32(f.numItems), uint32(f.victim.index), f.victim.tag} {
binary.LittleEndian.PutUint32(metadata[i*bytesPerUint32:], n)
}

victimUsed := byte(0)
if f.victim.used {
ret = append(ret, byte(1))
} else {
ret = append(ret, byte(0))
victimUsed = byte(1)
}
ret = append(ret, f.table.Encode()...)
metadata[bytesPerUint32*3] = victimUsed

return io.MultiReader(bytes.NewReader(metadata[:]), f.table.Reader())
}

return ret
// Decode returns a Cuckoo Filter using a copy of the provided byte slice.
func Decode(b []byte) (*Filter, error) {
copiedBytes := make([]byte, len(b))
copy(copiedBytes, b)
return DecodeFrom(copiedBytes)
}

// Decode returns a Cuckoo Filter from a byte slice
func Decode(bytes []byte) (*Filter, error) {
if len(bytes) < 20 {
// DecodeFrom returns a Cuckoo Filter using the exact provided byte slice (no copy).
func DecodeFrom(b []byte) (*Filter, error) {
if len(b) < 20 {
return nil, errors.New("unexpected bytes length")
}
numItems := uint(binary.LittleEndian.Uint32(bytes[0:4]))
curIndex := uint(binary.LittleEndian.Uint32(bytes[4:8]))
curTag := binary.LittleEndian.Uint32(bytes[8:12])
used := bytes[12] == byte(1)
tableType := uint(bytes[13])
numItems := uint(binary.LittleEndian.Uint32(b[0*bytesPerUint32:]))
curIndex := uint(binary.LittleEndian.Uint32(b[1*bytesPerUint32:]))
curTag := binary.LittleEndian.Uint32(b[2*1*bytesPerUint32:])
used := b[12] == byte(1)
tableType := uint(b[13])
table := getTable(tableType).(table)
err := table.Decode(bytes[13:])
if err != nil {
if err := table.Decode(b[13:]); err != nil {
return nil, err
}
return &Filter{
Expand Down
60 changes: 43 additions & 17 deletions cuckoofilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package cuckoo

import (
"bytes"
"crypto/rand"
"fmt"
"io"
Expand All @@ -15,9 +16,11 @@ import (

const size = 100000

var testBucketSize = []uint{2, 4, 8}
var testFingerprintSize = []uint{2, 4, 5, 6, 7, 8, 9, 10, 12, 13, 16, 17, 23, 31, 32}
var testTableType = []uint{TableTypeSingle, TableTypePacked}
var (
testBucketSize = []uint{2, 4, 8}
testFingerprintSize = []uint{2, 4, 5, 6, 7, 8, 9, 10, 12, 13, 16, 17, 23, 31, 32}
testTableType = []uint{TableTypeSingle, TableTypePacked}
)

func TestFilter(t *testing.T) {
var insertNum uint = 50000
Expand All @@ -33,7 +36,7 @@ func TestFilter(t *testing.T) {
continue
}
cf := NewFilter(b, f, 8190, table)
//fmt.Println(cf.Info())
// fmt.Println(cf.Info())
a := make([][]byte, 0)
for i := uint(0); i < insertNum; i++ {
_, _ = io.ReadFull(rand.Reader, hash[:])
Expand All @@ -45,12 +48,47 @@ func TestFilter(t *testing.T) {
}

count := cf.Size()

if count != uint(len(a)) {
t.Errorf("Expected count = %d, instead count = %d, b %v f %v", uint(len(a)), count, b, f)
return
}

encodedBytes, err := cf.Encode()
if err != nil {
t.Fatalf("err %v", err)
}
if len(encodedBytes) != cap(encodedBytes) {
t.Fatalf("len(%d) != cap(%d)", len(encodedBytes), cap(encodedBytes))
}
ncf, err := Decode(encodedBytes)
if err != nil || !reflect.DeepEqual(cf, ncf) {
t.Errorf("Expected epual, err %v", err)
return
}

encodedBytes, err = cf.Encode()
if err != nil {
t.Fatalf("err %v", err)
}
ncf, err = DecodeFrom(encodedBytes)
if err != nil || !reflect.DeepEqual(cf, ncf) {
t.Errorf("Expected epual, err %v", err)
return
}

bytesFromReader, err := io.ReadAll(cf.EncodeReader())
if err != nil {
t.Fatalf("Error reading from reader")
}
if !bytes.Equal(bytesFromReader, encodedBytes) {
t.Fatalf("Expected to be equal")
}

fmt.Println(cf.Info())
cf.BitsPerItem()
cf.SizeInBytes()
cf.LoadFactor()

for _, v := range a {
if !cf.Contain(v) {
t.Errorf("Expected contain, instead not contain, b %v f %v table type %v", b, f, table)
Expand All @@ -65,22 +103,10 @@ func TestFilter(t *testing.T) {
return
}

bytes := cf.Encode()
ncf, err := Decode(bytes)
if err != nil || !reflect.DeepEqual(cf, ncf) {
t.Errorf("Expected epual, err %v", err)
return
}

cf.Info()
cf.BitsPerItem()
cf.SizeInBytes()
cf.LoadFactor()
fmt.Printf("Filter bucketSize %v fingerprintSize %v tableType %v falsePositive Rate %v \n", b, f, table, cf.FalsePositiveRate())
}
}
}

}

func BenchmarkFilterSingle_Reset(b *testing.B) {
Expand Down
Loading

0 comments on commit a2a1889

Please sign in to comment.