diff --git a/posting/list.go b/posting/list.go index f5a47f677e7..b2a509432ae 100644 --- a/posting/list.go +++ b/posting/list.go @@ -19,10 +19,10 @@ package posting import ( "bytes" "context" - "fmt" "log" "math" "sort" + "sync" "github.com/dgryski/go-farm" @@ -279,7 +279,8 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { postingType = pb.Posting_REF } - return &pb.Posting{ + p := postingPool.Get().(*pb.Posting) + *p = pb.Posting{ Uid: t.ValueId, Value: t.Value, ValType: t.ValueType, @@ -289,6 +290,7 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { Op: op, Facets: t.Facets, } + return p } func hasDeleteAll(mpost *pb.Posting) bool { @@ -393,6 +395,26 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er return l.addMutationInternal(ctx, txn, t) } +var postingPool = &sync.Pool{ + New: func() interface{} { + return &pb.Posting{} + }, +} + +func (l *List) release() { + fromList := func(list *pb.PostingList) { + for _, p := range list.GetPostings() { + postingPool.Put(p) + } + } + fromList(l.plist) + for _, plist := range l.mutationMap { + fromList(plist) + } + l.plist = nil + l.mutationMap = nil +} + func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { l.AssertLock() @@ -400,8 +422,11 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed return y.ErrConflict } - getKey := func(key []byte, uid uint64) string { - return fmt.Sprintf("%s|%d", key, uid) + getKey := func(key []byte, uid uint64) uint64 { + // Instead of creating a string first and then doing a fingerprint, let's do a fingerprint + // here to save memory allocations. + // Not entirely sure about effect on collision chances due to this simple XOR with uid. + return farm.Fingerprint64(key) ^ uid } mpost := NewPosting(t) @@ -415,7 +440,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed // We ensure that commit marks are applied to posting lists in the right // order. We can do so by proposing them in the same order as received by the Oracle delta // stream from Zero, instead of in goroutines. - var conflictKey string + var conflictKey uint64 pk := x.Parse(l.key) switch { case schema.State().HasUpsert(t.Attr): diff --git a/posting/lists.go b/posting/lists.go index cc803e25e5e..53364f25d5a 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -281,6 +281,7 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { lc.deltas[key] = data } lc.maxVersions[key] = pl.maxVersion() + pl.release() } lc.plists = make(map[string]*List) } diff --git a/posting/mvcc.go b/posting/mvcc.go index 14a32b3c706..6f46e8b57e7 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -27,7 +27,6 @@ import ( "github.com/dgraph-io/dgo/protos/api" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" - farm "github.com/dgryski/go-farm" "github.com/pkg/errors" ) @@ -44,13 +43,13 @@ func (txn *Txn) ShouldAbort() bool { return atomic.LoadUint32(&txn.shouldAbort) > 0 } -func (txn *Txn) addConflictKey(conflictKey string) { +func (txn *Txn) addConflictKey(conflictKey uint64) { txn.Lock() defer txn.Unlock() if txn.conflicts == nil { - txn.conflicts = make(map[string]struct{}) + txn.conflicts = make(map[uint64]struct{}) } - if len(conflictKey) > 0 { + if conflictKey > 0 { txn.conflicts[conflictKey] = struct{}{} } } @@ -63,7 +62,7 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { // We don'txn need to send the whole conflict key to Zero. Solving #2338 // should be done by sending a list of mutating predicates to Zero, // along with the keys to be used for conflict detection. - fps := strconv.FormatUint(farm.Fingerprint64([]byte(key)), 36) + fps := strconv.FormatUint(key, 36) if !x.HasString(ctx.Keys, fps) { ctx.Keys = append(ctx.Keys, fps) } diff --git a/posting/oracle.go b/posting/oracle.go index adc4045b075..59619124851 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -54,7 +54,7 @@ type Txn struct { // Keeps track of conflict keys that should be used to determine if this // transaction conflicts with another. - conflicts map[string]struct{} + conflicts map[uint64]struct{} // Keeps track of last update wall clock. We use this fact later to // determine unhealthy, stale txns.