Skip to content

Commit

Permalink
Optimize uid allocation in live loader. (#5132)
Browse files Browse the repository at this point in the history
The live loader is having trouble loading exported data with the
existing uids because there are too many requests for new uids.
The current version requests new Uids to be allocated for every
uids greater than the maximum. In the exported data, the uids can come
in increasing order, which causes a new request for uids with every
NQuad.

This PR changes the code to pre-allocate the uids, once per batch of
NQuad received from the NQuad buffer channel.

Tested it with the 1 million movie data set and now I am getting times
similar to the live loader with the --new_uids option enabled.

Fixes #4996
  • Loading branch information
martinmr authored Apr 9, 2020
1 parent 1ec1ffd commit dac00bd
Showing 1 changed file with 30 additions and 1 deletion.
31 changes: 30 additions & 1 deletion dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ func (l *loader) uid(val string) string {
// later to another node. It is up to the user to avoid this.
if !opt.newUids {
if uid, err := strconv.ParseUint(val, 0, 64); err == nil {
l.alloc.BumpTo(uid)
return fmt.Sprintf("%#x", uid)
}
}
Expand All @@ -214,6 +213,34 @@ func (l *loader) uid(val string) string {
return fmt.Sprintf("%#x", uint64(uid))
}

// allocateUids looks for the maximum uid value in the given NQuads and bumps the
// maximum seen uid to that value.
func (l *loader) allocateUids(nqs []*api.NQuad) {
if opt.newUids {
return
}

var maxUid uint64
for _, nq := range nqs {
sUid, err := strconv.ParseUint(nq.Subject, 0, 64)
if err != nil {
continue
}
if sUid > maxUid {
maxUid = sUid
}

oUid, err := strconv.ParseUint(nq.ObjectId, 0, 64)
if err != nil {
continue
}
if oUid > maxUid {
maxUid = oUid
}
}
l.alloc.BumpTo(maxUid)
}

// processFile forwards a file to the RDF or JSON processor as appropriate
func (l *loader) processFile(ctx context.Context, filename string) error {
fmt.Printf("Processing data file %q\n", filename)
Expand Down Expand Up @@ -281,6 +308,8 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
if len(nqs) == 0 {
continue
}

l.allocateUids(nqs)
for _, nq := range nqs {
nq.Subject = l.uid(nq.Subject)
if len(nq.ObjectId) > 0 {
Expand Down

0 comments on commit dac00bd

Please sign in to comment.