From 9ddb8066c5af40b2c33f75f248541e815ce5adf0 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 25 Dec 2019 13:02:36 +0530 Subject: [PATCH 1/8] Replace global rand lock with local one --- skl/skl.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/skl/skl.go b/skl/skl.go index cdfc599be..7b32c0065 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -36,6 +36,7 @@ import ( "math" "math/rand" "sync/atomic" + "time" "unsafe" "github.com/dgraph-io/badger/v2/y" @@ -79,6 +80,7 @@ type Skiplist struct { head *node ref int32 arena *Arena + rng rand.Source } // IncrRef increases the refcount @@ -132,6 +134,7 @@ func NewSkiplist(arenaSize int64) *Skiplist { head: head, arena: arena, ref: 1, + rng: rand.NewSource(time.Now().UnixNano()), } } @@ -165,9 +168,9 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { // return n != nil && y.CompareKeys(key, n.key) > 0 //} -func randomHeight() int { +func (s *Skiplist) randomHeight() int { h := 1 - for h < maxHeight && rand.Uint32() <= heightIncrease { + for h < maxHeight && uint32(s.rng.Int63()) <= heightIncrease { h++ } return h @@ -300,7 +303,7 @@ func (s *Skiplist) Put(key []byte, v y.ValueStruct) { } // We do need to create a new node. - height := randomHeight() + height := s.randomHeight() x := newNode(s.arena, key, v, height) // Try to increase s.height via CAS. From 8a97b12049ac9799283aba2330de8dcfb1bf0e72 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 6 Jan 2020 14:29:45 +0530 Subject: [PATCH 2/8] Use rand.Rand instead of rand.Source --- skl/skl.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/skl/skl.go b/skl/skl.go index 7b32c0065..a06951dd9 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -80,7 +80,7 @@ type Skiplist struct { head *node ref int32 arena *Arena - rng rand.Source + rand *rand.Rand } // IncrRef increases the refcount @@ -134,7 +134,7 @@ func NewSkiplist(arenaSize int64) *Skiplist { head: head, arena: arena, ref: 1, - rng: rand.NewSource(time.Now().UnixNano()), + rand: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -170,7 +170,7 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { func (s *Skiplist) randomHeight() int { h := 1 - for h < maxHeight && uint32(s.rng.Int63()) <= heightIncrease { + for h < maxHeight && s.rand.Uint32() <= heightIncrease { h++ } return h From 5be7f769bf1e595c9add4b2537f21ca5249a50f9 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Wed, 8 Jan 2020 15:14:34 +0530 Subject: [PATCH 3/8] Add comment --- skl/skl.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/skl/skl.go b/skl/skl.go index a06951dd9..0ca9a1f7a 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -170,6 +170,8 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { func (s *Skiplist) randomHeight() int { h := 1 + // rand.Uint32() is not thread safe. Currently, all writes to memtable are sequential + // so it should be okay to use rand.Uint32. for h < maxHeight && s.rand.Uint32() <= heightIncrease { h++ } From c25d2b275013296db15aab57bbeb8bf26ac832bc Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 10 Jan 2020 15:55:58 +0530 Subject: [PATCH 4/8] Use fastRand instead of rand.New(..) --- skl/skl.go | 9 ++------- skl/skl_test.go | 2 +- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/skl/skl.go b/skl/skl.go index 0ca9a1f7a..43694f14b 100644 --- a/skl/skl.go +++ b/skl/skl.go @@ -34,12 +34,11 @@ package skl import ( "math" - "math/rand" "sync/atomic" - "time" "unsafe" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" ) const ( @@ -80,7 +79,6 @@ type Skiplist struct { head *node ref int32 arena *Arena - rand *rand.Rand } // IncrRef increases the refcount @@ -134,7 +132,6 @@ func NewSkiplist(arenaSize int64) *Skiplist { head: head, arena: arena, ref: 1, - rand: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -170,9 +167,7 @@ func (s *node) casNextOffset(h int, old, val uint32) bool { func (s *Skiplist) randomHeight() int { h := 1 - // rand.Uint32() is not thread safe. Currently, all writes to memtable are sequential - // so it should be okay to use rand.Uint32. - for h < maxHeight && s.rand.Uint32() <= heightIncrease { + for h < maxHeight && z.FastRand() <= heightIncrease { h++ } return h diff --git a/skl/skl_test.go b/skl/skl_test.go index 6bd075862..4d4959222 100644 --- a/skl/skl_test.go +++ b/skl/skl_test.go @@ -499,7 +499,7 @@ func BenchmarkReadWriteMap(b *testing.B) { b.RunParallel(func(pb *testing.PB) { rng := rand.New(rand.NewSource(time.Now().UnixNano())) for pb.Next() { - if rand.Float32() < readFrac { + if rng.Float32() < readFrac { mutex.RLock() _, ok := m[string(randomKey(rng))] mutex.RUnlock() From baffaf0c12d45f5d44f7ec762bbbba9bbe03985f Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 10 Jan 2020 16:04:25 +0530 Subject: [PATCH 5/8] Add Concurrent Write benchmark --- skl/skl_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/skl/skl_test.go b/skl/skl_test.go index 4d4959222..0be7a64e4 100644 --- a/skl/skl_test.go +++ b/skl/skl_test.go @@ -516,3 +516,16 @@ func BenchmarkReadWriteMap(b *testing.B) { }) } } + +func BenchmarkWrite(b *testing.B) { + value := newValue(123) + l := NewSkiplist(int64((b.N + 1) * MaxNodeSize)) + defer l.DecrRef() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + for pb.Next() { + l.Put(randomKey(rng), y.ValueStruct{Value: value, Meta: 0, UserMeta: 0}) + } + }) +} From e9dee7969204569bf8223415731dd06368776c67 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 13 Jan 2020 20:25:07 +0530 Subject: [PATCH 6/8] fix table checksum test --- table/table.go | 2 ++ table/table_test.go | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/table/table.go b/table/table.go index d68169384..962101783 100644 --- a/table/table.go +++ b/table/table.go @@ -232,6 +232,7 @@ func OpenTable(fd *os.File, opts Options) (*Table, error) { if err := t.initBiggestAndSmallest(); err != nil { return nil, errors.Wrapf(err, "failed to initialize table") } + if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead { if err := t.VerifyChecksum(); err != nil { _ = fd.Close() @@ -316,6 +317,7 @@ func (t *Table) readNoFail(off, sz int) []byte { func (t *Table) readIndex() error { readPos := t.tableSize + y.AssertTruef(readPos > 0, "readPos less than zero: %d", readPos) // Read checksum len from the last 4 bytes. readPos -= 4 buf := t.readNoFail(readPos, 4) diff --git a/table/table_test.go b/table/table_test.go index 82bddf591..7aa47d547 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -743,7 +743,10 @@ func TestTableChecksum(t *testing.T) { f := buildTestTable(t, "k", 10000, opts) fi, err := f.Stat() require.NoError(t, err, "unable to get file information") - f.WriteAt(rb, rand.Int63n(fi.Size())) + // Write random bytes at random location. + n, err := f.WriteAt(rb, rand.Int63n(fi.Size())) + require.NoError(t, err) + require.Equal(t, n, len(rb)) _, err = OpenTable(f, opts) if err == nil || !strings.Contains(err.Error(), "checksum") { From 4127ceae5af0232cf336a5364659d422f898f09f Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 13 Jan 2020 22:46:27 +0530 Subject: [PATCH 7/8] Return error instead of assert --- table/table.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/table/table.go b/table/table.go index 962101783..f50884b6c 100644 --- a/table/table.go +++ b/table/table.go @@ -317,7 +317,9 @@ func (t *Table) readNoFail(off, sz int) []byte { func (t *Table) readIndex() error { readPos := t.tableSize - y.AssertTruef(readPos > 0, "readPos less than zero: %d", readPos) + if readPos <= 0 { + return errors.New("readPos less than zero. Data corrupted") + } // Read checksum len from the last 4 bytes. readPos -= 4 buf := t.readNoFail(readPos, 4) From c9ed8f29105c2b9c91d21ea5ff5bcc337ef80893 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 14 Jan 2020 16:32:14 +0530 Subject: [PATCH 8/8] fixup --- table/table.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/table/table.go b/table/table.go index f50884b6c..fc20be7c0 100644 --- a/table/table.go +++ b/table/table.go @@ -317,13 +317,13 @@ func (t *Table) readNoFail(off, sz int) []byte { func (t *Table) readIndex() error { readPos := t.tableSize - if readPos <= 0 { - return errors.New("readPos less than zero. Data corrupted") - } // Read checksum len from the last 4 bytes. readPos -= 4 buf := t.readNoFail(readPos, 4) checksumLen := int(y.BytesToU32(buf)) + if checksumLen < 0 { + return errors.New("checksum length less than zero. Data corrupted") + } // Read checksum. expectedChk := &pb.Checksum{}