Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/scheduler #1372

Merged
merged 4 commits into from
Sep 1, 2017
Merged

Feature/scheduler #1372

merged 4 commits into from
Sep 1, 2017

Conversation

janardhan1993
Copy link
Contributor

@janardhan1993 janardhan1993 commented Aug 29, 2017

Scheduler ensures that del and set for same uid,predicate pair always run in fifo order, schema mutations only blocks mtuations for corresponding predicates, if schema is not specified mutations for a given predicate wait on the first mutation.

TODO: Add Tests


This change is Reviewable

@manishrjain
Copy link
Contributor

Review status: 0 of 6 files reviewed at latest revision, 13 unresolved discussions, some commit checks failed.


worker/draft.go, line 138 at r1 (raw file):

	p.Lock()
	defer p.Unlock()
	if pd, has := p.ids[pid]; has {

This can be asserted.


worker/draft.go, line 924 at r1 (raw file):

			// since dependencies between tasks are tracked at task level and schema mutations
			// can take a long time.
			if proposal.Mutations.Schema != nil {

Scheduler can do this before scheduling it concurrently. It should see this serially first.


worker/draft.go, line 49 at r2 (raw file):

const (
	opSet           byte = 0x01

move to scheduler.go.


worker/draft.go, line 172 at r2 (raw file):

		return
	}
	pd.ch <- pd.err

set err := pd.err while you still have the lock. Otherwise, this is a race cond.


worker/draft.go, line 190 at r2 (raw file):

	id              uint32
	ridx            uint64 // raft index corresponding to the task
	numDependencies int32

Add a comment about this.


worker/draft.go, line 202 at r2 (raw file):

// 5. If the task is too big scheduler can decide to breakup the task into smaller
// chunks and run them in parallel.
type schedulerCtx struct {

Move this to worker/scheduler.go


worker/draft.go, line 282 at r2 (raw file):

func taskKey(op byte, attribute string, uid uint64) string {
	return fmt.Sprintf("%s%d%s%d", op, len(attribute), attribute, uid)

fmt.Sprintf("%s|%d", attribute, uid)


posting/index.go, line 427 at r1 (raw file):

	err := lcache.clear(func(key []byte) bool {
		pk := x.Parse(key)
		if pk.Attr == attr && pk.IsType(x.ByteReverse) {

Create a helper function, which you can pass directly here; and reuse.


posting/index.go, line 780 at r1 (raw file):

func hasToken(attr string, uid uint64, token string) (bool, error) {
	// GetOrCreate might be better for larger texts ?

Remove this as well.


posting/index.go, line 806 at r1 (raw file):

// We can remove it if underlying store supports atomic writes
// Since we take lock over indexPl, ideally there shouldn't be any race conditions
func RemoveStaleIndices(ctx context.Context, attr string) error {

Don't make it part of this PR.


posting/lists.go, line 407 at r1 (raw file):

}

// This doesn't sync, so call this only when you don't care about dirty pl's in

s/pl's/posting lists/g


posting/lru.go, line 177 at r1 (raw file):

}

func (c *listCache) clear(f func(key []byte) bool) error {

s/f/remove


posting/lru.go, line 183 at r1 (raw file):

		kv := e.Value.(*entry)
		shouldRemove := f(kv.pl.key)
		if !shouldRemove {

if !remove(kv.pl.key) {
}


Comments from Reviewable

@janardhan1993 janardhan1993 force-pushed the feature/scheduler branch 2 times, most recently from 096f296 to 90dbe2e Compare September 1, 2017 00:14
@janardhan1993
Copy link
Contributor Author

Review status: 0 of 7 files reviewed at latest revision, 13 unresolved discussions.


worker/draft.go, line 138 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

This can be asserted.

Done.


worker/draft.go, line 924 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Scheduler can do this before scheduling it concurrently. It should see this serially first.

Done.


worker/draft.go, line 49 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

move to scheduler.go.

Done.


worker/draft.go, line 172 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

set err := pd.err while you still have the lock. Otherwise, this is a race cond.

Done.


worker/draft.go, line 190 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Add a comment about this.

Done.


worker/draft.go, line 202 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Move this to worker/scheduler.go

Done.


worker/draft.go, line 282 at r2 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

fmt.Sprintf("%s|%d", attribute, uid)

Done.


posting/index.go, line 427 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Create a helper function, which you can pass directly here; and reuse.

Done. In other PR


posting/index.go, line 780 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Remove this as well.

Done.


posting/index.go, line 806 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Don't make it part of this PR.

Done.


posting/lists.go, line 407 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

s/pl's/posting lists/g

Done.


posting/lru.go, line 177 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

s/f/remove

Done.


posting/lru.go, line 183 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

if !remove(kv.pl.key) {
}

Done.


Comments from Reviewable

@manishrjain
Copy link
Contributor

Review status: 0 of 7 files reviewed at latest revision, 15 unresolved discussions.


worker/draft.go, line 630 at r4 (raw file):

			// ensures that index is not mark completed until all tasks
			// are submitted to scheduler
			n.props.IncRef(proposal.Id, 1)

This ref tracking should be done by the scheduler.


worker/scheduler.go, line 45 at r4 (raw file):

	tasks map[uint32][]*task
	tch   chan *task
	ptch  chan *task // priority task channel

No need.


worker/scheduler.go, line 52 at r4 (raw file):

	// if a mutation comes which is using the predicate. Dgraph would be
	// under heavy load while doing schema mutations so it should be ok to block.
	schemaMap     map[string]chan struct{}

Yeah, maybe just block everything.


worker/scheduler.go, line 61 at r4 (raw file):

	s.n = n
	s.tasks = make(map[uint32][]*task)
	s.tch = make(chan *task, 10000)

1000 should be enough.


worker/scheduler.go, line 62 at r4 (raw file):

	s.tasks = make(map[uint32][]*task)
	s.tch = make(chan *task, 10000)
	s.ptch = make(chan *task, 10000)

No need.


worker/scheduler.go, line 64 at r4 (raw file):

	s.ptch = make(chan *task, 10000)
	s.schemaMap = make(map[string]chan struct{})
	s.pendingSchema = make(chan struct{}, 10)

1 should be enough.


worker/scheduler.go, line 88 at r4 (raw file):

func (s *scheduler) executeMutation(t *task) {
	err := s.n.processMutation(t.pid, t.rid, t.edge)

n := s.n


worker/scheduler.go, line 93 at r4 (raw file):

	posting.SyncMarkFor(s.n.gid).Done(t.rid)
	x.ActiveMutations.Add(-1)
	if nextTask := s.nextTask(t); nextTask != nil {

Just continue running mutations.


worker/scheduler.go, line 103 at r4 (raw file):

}

func (s *scheduler) canSchedule(t *task) bool {

register(t *task)


worker/scheduler.go, line 139 at r4 (raw file):

func (s *scheduler) schedule(proposal *protos.Proposal, index uint64) {
	total := len(proposal.Mutations.Edges) + len(proposal.Mutations.Schema)
	if total > 0 {

if total == 0 { return }


worker/scheduler.go, line 142 at r4 (raw file):

		s.n.props.IncRef(proposal.Id, total)
		x.ActiveMutations.Add(int64(total))
		s.n.applied.BeginWithCount(index, total)

Better to only mark applied as done once, instead of for every edge.


worker/scheduler.go, line 143 at r4 (raw file):

		x.ActiveMutations.Add(int64(total))
		s.n.applied.BeginWithCount(index, total)
		posting.SyncMarkFor(s.n.gid).BeginWithCount(index, total)

same here.


worker/scheduler.go, line 147 at r4 (raw file):

	for _, supdate := range proposal.Mutations.Schema {
		s.waitForSchema(supdate.Predicate)
		s.scheduleSchema(proposal.Id, index, supdate)

just run them here.


worker/scheduler.go, line 200 at r4 (raw file):

}

func (s *scheduler) scheduleMutation(t *task) {

No need for this func.


x/watermark.go, line 56 at r4 (raw file):

	indices []uint64
	done    bool // Set to true if the pending mutation is done.
	count   int

No need for this change.


Comments from Reviewable

@manishrjain
Copy link
Contributor

:lgtm:


Reviewed 1 of 13 files at r1, 4 of 10 files at r3, 2 of 2 files at r5.
Review status: all files reviewed at latest revision, 23 unresolved discussions.


worker/draft.go, line 107 at r5 (raw file):

	ch    chan error
	ctx   context.Context
	cnt   int

and this.


worker/draft.go, line 108 at r5 (raw file):

	ctx   context.Context
	cnt   int
	err   error

Add a comment about this.


worker/draft.go, line 165 at r5 (raw file):

	pd.ch <- pd.err
	pd.n.applied.Done(pd.index)
	posting.SyncMarkFor(pd.n.gid).Done(pd.index)

Add a comment why you're marking it as done.


worker/draft.go, line 580 at r5 (raw file):

	x.ActiveMutations.Add(1)
	n.props.IncRef(pid, index, 1)
	defer x.ActiveMutations.Add(-1)

Right next to Add, and vertical space after.


worker/draft.go, line 627 at r5 (raw file):

		// One final applied and synced watermark would be emitted when proposal ctx ref count
		// becomes zero

full stop.


worker/mutation.go, line 44 at r5 (raw file):

// runMutations goes through all the edges and applies them. It returns the
// mutations which were not applied in left.
func runMutations(ctx context.Context, edge *protos.DirectedEdge) error {

runMutation, singular.


worker/mutation.go, line 89 at r5 (raw file):

// This is serialized with mutations, called after applied watermarks catch up
// and further mutations are blocked until this is done.
func runSchemaMutations(ctx context.Context, update *protos.SchemaUpdate) error {

runSchemaMutation.


worker/scheduler.go, line 64 at r5 (raw file):

}

func (s *scheduler) executeMutation(t *task) {

Move this func within processTasks.


Comments from Reviewable

@janardhan1993
Copy link
Contributor Author

Review status: 3 of 6 files reviewed at latest revision, 23 unresolved discussions.


worker/draft.go, line 630 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

This ref tracking should be done by the scheduler.

Done.


worker/draft.go, line 107 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

and this.

Done.


worker/draft.go, line 108 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Add a comment about this.

Done.


worker/draft.go, line 165 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Add a comment why you're marking it as done.

Done.


worker/draft.go, line 580 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Right next to Add, and vertical space after.

Done.


worker/draft.go, line 627 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

full stop.

Done.


worker/mutation.go, line 44 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

runMutation, singular.

Done.


worker/mutation.go, line 89 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

runSchemaMutation.

Done.


worker/scheduler.go, line 45 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

No need.

Done.


worker/scheduler.go, line 52 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Yeah, maybe just block everything.

Done.


worker/scheduler.go, line 61 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

1000 should be enough.

Done.


worker/scheduler.go, line 62 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

No need.

Done.


worker/scheduler.go, line 64 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

1 should be enough.

Done.


worker/scheduler.go, line 88 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

n := s.n

Done.


worker/scheduler.go, line 93 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Just continue running mutations.

Done.


worker/scheduler.go, line 103 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

register(t *task)

Done.


worker/scheduler.go, line 139 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

if total == 0 { return }

Done.


worker/scheduler.go, line 142 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Better to only mark applied as done once, instead of for every edge.

Done.


worker/scheduler.go, line 143 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

same here.

Done.


worker/scheduler.go, line 147 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

just run them here.

Done.


worker/scheduler.go, line 200 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

No need for this func.

Done.


worker/scheduler.go, line 64 at r5 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Move this func within processTasks.

Done.


x/watermark.go, line 56 at r4 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

No need for this change.

Done.


Comments from Reviewable

@janardhan1993 janardhan1993 merged commit ecbb38b into master Sep 1, 2017
@janardhan1993 janardhan1993 deleted the feature/scheduler branch September 1, 2017 04:39
jarifibrahim pushed a commit that referenced this pull request Jul 11, 2020
This commit brings following new changes from badger
This commit also disable conflict detection in badger to save memory.

```
0dfb8b4 Changelog for v20.07.0 (#1411)
03ba278 Add missing changelog for v2.0.3 (#1410)
6001230 Revert "Compress/Encrypt Blocks in the background (#1227)" (#1409)
800305e Revert "Buffer pool for decompression (#1308)" (#1408)
63d9309 Revert "fix: Fix race condition in block.incRef (#1337)" (#1407)
e0d058c Revert "add assert to check integer overflow for table size (#1402)" (#1406)
d981f47 return error if the vlog writes exceeds more that 4GB. (#1400)
7f4e4b5 add assert to check integer overflow for table size (#1402)
8e896a7 Add a contribution guide (#1379)
b79aeef Avoid panic on multiple closer.Signal calls (#1401)
717b89c Enable cross-compiled 32bit tests on TravisCI (#1392)
09dfa66 Update ristretto to commit f66de99 (#1391)
509de73 Update head while replaying value log (#1372)
e013bfd Rework DB.DropPrefix (#1381)
3042e37 pre allocate cache key for the block cache and the bloom filter cache (#1371)
675efcd Increase default valueThreshold from 32B to 1KB (#1346)
158d927 Remove second initialization of writech in Open (#1382)
d37ce36 Tests: Use t.Parallel in TestIteratePrefix tests  (#1377)
3f4761d Force KeepL0InMemory to be true when InMemory is true (#1375)
dd332b0 Avoid panic in filltables() (#1365)
c45d966 Fix assert in background compression and encryption. (#1366)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants