From 39dcaa2d12a7438fbb339ee4ff773e6fc7af8b87 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 6 Aug 2019 17:43:17 -0700 Subject: [PATCH 1/5] Let's reuse postings --- posting/list.go | 26 +++++++++++++++++++++++++- posting/lists.go | 1 + 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/posting/list.go b/posting/list.go index f5a47f677e7..10b38de9501 100644 --- a/posting/list.go +++ b/posting/list.go @@ -23,6 +23,7 @@ import ( "log" "math" "sort" + "sync" "github.com/dgryski/go-farm" @@ -279,7 +280,9 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { postingType = pb.Posting_REF } - return &pb.Posting{ + // p := &pb.Posting{} + p := postingPool.Get().(*pb.Posting) + *p = pb.Posting{ Uid: t.ValueId, Value: t.Value, ValType: t.ValueType, @@ -289,6 +292,7 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { Op: op, Facets: t.Facets, } + return p } func hasDeleteAll(mpost *pb.Posting) bool { @@ -393,6 +397,26 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er return l.addMutationInternal(ctx, txn, t) } +var postingPool = &sync.Pool{ + New: func() interface{} { + return &pb.Posting{} + }, +} + +func (l *List) Release() { + fromList := func(list *pb.PostingList) { + for _, p := range list.GetPostings() { + postingPool.Put(p) + } + } + fromList(l.plist) + for _, plist := range l.mutationMap { + fromList(plist) + } + l.plist = nil + l.mutationMap = nil +} + func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { l.AssertLock() diff --git a/posting/lists.go b/posting/lists.go index cc803e25e5e..8631c7dea17 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -281,6 +281,7 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { lc.deltas[key] = data } lc.maxVersions[key] = pl.maxVersion() + pl.Release() } lc.plists = make(map[string]*List) } From 6a0e5342be8bd86e43a37e1f8fa74c9223bbb916 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 6 Aug 2019 18:36:27 -0700 Subject: [PATCH 2/5] Do not use fmt.Sprintf to generate collision key. Instead use and store fingerprints directly. --- posting/list.go | 10 ++++++---- posting/mvcc.go | 9 ++++----- posting/oracle.go | 2 +- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/posting/list.go b/posting/list.go index 10b38de9501..664a16269cb 100644 --- a/posting/list.go +++ b/posting/list.go @@ -19,7 +19,6 @@ package posting import ( "bytes" "context" - "fmt" "log" "math" "sort" @@ -424,8 +423,11 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed return y.ErrConflict } - getKey := func(key []byte, uid uint64) string { - return fmt.Sprintf("%s|%d", key, uid) + getKey := func(key []byte, uid uint64) uint64 { + // Instead of creating a string first and then doing a fingerprint, let's do a fingerprint + // here to save memory allocations. + // Not entirely sure about effect on collision chances due to this simple XOR with uid. + return farm.Fingerprint64(key) ^ uid } mpost := NewPosting(t) @@ -439,7 +441,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed // We ensure that commit marks are applied to posting lists in the right // order. We can do so by proposing them in the same order as received by the Oracle delta // stream from Zero, instead of in goroutines. - var conflictKey string + var conflictKey uint64 pk := x.Parse(l.key) switch { case schema.State().HasUpsert(t.Attr): diff --git a/posting/mvcc.go b/posting/mvcc.go index 14a32b3c706..6f46e8b57e7 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" - farm "github.com/dgryski/go-farm" "github.com/pkg/errors" ) @@ -44,13 +43,13 @@ func (txn *Txn) ShouldAbort() bool { return atomic.LoadUint32(&txn.shouldAbort) > 0 } -func (txn *Txn) addConflictKey(conflictKey string) { +func (txn *Txn) addConflictKey(conflictKey uint64) { txn.Lock() defer txn.Unlock() if txn.conflicts == nil { - txn.conflicts = make(map[string]struct{}) + txn.conflicts = make(map[uint64]struct{}) } - if len(conflictKey) > 0 { + if conflictKey > 0 { txn.conflicts[conflictKey] = struct{}{} } } @@ -63,7 +62,7 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { // We don'txn need to send the whole conflict key to Zero. Solving #2338 // should be done by sending a list of mutating predicates to Zero, // along with the keys to be used for conflict detection. - fps := strconv.FormatUint(farm.Fingerprint64([]byte(key)), 36) + fps := strconv.FormatUint(key, 36) if !x.HasString(ctx.Keys, fps) { ctx.Keys = append(ctx.Keys, fps) } diff --git a/posting/oracle.go b/posting/oracle.go index adc4045b075..59619124851 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -54,7 +54,7 @@ type Txn struct { // Keeps track of conflict keys that should be used to determine if this // transaction conflicts with another. - conflicts map[string]struct{} + conflicts map[uint64]struct{} // Keeps track of last update wall clock. We use this fact later to // determine unhealthy, stale txns. From b563eb69fde8121bd871ea5ae13973ef0ec23132 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 6 Aug 2019 18:49:59 -0700 Subject: [PATCH 3/5] Reset pb.MapEntry --- dgraph/cmd/bulk/mapper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dgraph/cmd/bulk/mapper.go b/dgraph/cmd/bulk/mapper.go index 0d822f290a8..cc0e606fa01 100644 --- a/dgraph/cmd/bulk/mapper.go +++ b/dgraph/cmd/bulk/mapper.go @@ -190,6 +190,7 @@ func (m *mapper) addMapEntry(key []byte, p *pb.Posting, shard int) { atomic.AddInt64(&m.prog.mapEdgeCount, 1) me := m.mePool.Get().(*pb.MapEntry) + *me = pb.MapEntry{} me.Key = key if p.PostingType != pb.Posting_REF || len(p.Facets) > 0 { From 95eaa41f9d6c3628a73754422b38de7e7312d484 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Tue, 6 Aug 2019 18:55:29 -0700 Subject: [PATCH 4/5] Remove comment --- posting/list.go | 1 - 1 file changed, 1 deletion(-) diff --git a/posting/list.go b/posting/list.go index 664a16269cb..3393c18ad1d 100644 --- a/posting/list.go +++ b/posting/list.go @@ -279,7 +279,6 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { postingType = pb.Posting_REF } - // p := &pb.Posting{} p := postingPool.Get().(*pb.Posting) *p = pb.Posting{ Uid: t.ValueId, From 99731bf6739103c4c87a32fe3278f9d0afb0e518 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 7 Aug 2019 19:17:06 -0700 Subject: [PATCH 5/5] Make release private --- posting/list.go | 2 +- posting/lists.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/posting/list.go b/posting/list.go index 3393c18ad1d..b2a509432ae 100644 --- a/posting/list.go +++ b/posting/list.go @@ -401,7 +401,7 @@ var postingPool = &sync.Pool{ }, } -func (l *List) Release() { +func (l *List) release() { fromList := func(list *pb.PostingList) { for _, p := range list.GetPostings() { postingPool.Put(p) diff --git a/posting/lists.go b/posting/lists.go index 8631c7dea17..53364f25d5a 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -281,7 +281,7 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { lc.deltas[key] = data } lc.maxVersions[key] = pl.maxVersion() - pl.Release() + pl.release() } lc.plists = make(map[string]*List) }