Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulkloader stage 2 (shuffle) #1406

Merged
merged 26 commits into from
Sep 6, 2017
Merged

Bulkloader stage 2 (shuffle) #1406

merged 26 commits into from
Sep 6, 2017

Conversation

peterstace
Copy link

@peterstace peterstace commented Sep 6, 2017

  • After N sorted files are dumped to disk, the shuffle phase begins.
  • Each file is opened in its own goroutine.
  • Parse each file for flat postings.
  • Push each posting onto a channel (1 channel per posting file).
  • A min-heap data structure knows about each of the channels.
  • Reads off the channels to provide the final sorted stream of flat postings.
  • Completely sorted flat postings are batched up and sent to the reduce phase (currently stubbed out -- to be completed in the next phase).

This change is Reviewable

@manishrjain
Copy link
Contributor

Reviewed 8 of 8 files at r1.
Review status: all files reviewed at latest revision, 13 unresolved discussions, some commit checks broke.


cmd/bulkloader/loader.go, line 37 at r1 (raw file):

	*state
	mappers     []*mapper
	mappedFiles []string

mapOutput


cmd/bulkloader/loader.go, line 74 at r1 (raw file):

	go func() {
		ld.mappedFiles = writeMappedFiles(tmpPostingsDir, ld.postingsCh, ld.prog)

writeMapOutput


cmd/bulkloader/loader.go, line 113 at r1 (raw file):

func (ld *loader) reduceStage() {
	// Read from map stage.
	flatPostingChs := make([]chan *protos.FlatPosting, len(ld.mappedFiles))

shuffleInputChs


cmd/bulkloader/loader.go, line 114 at r1 (raw file):

	// Read from map stage.
	flatPostingChs := make([]chan *protos.FlatPosting, len(ld.mappedFiles))
	uniDirFlatPostingChs := make([]<-chan *protos.FlatPosting, len(ld.mappedFiles))

No need for this.


cmd/bulkloader/loader.go, line 122 at r1 (raw file):

	// Shuffle concurrently with reduce.
	batchCh := make(chan []*protos.FlatPosting, 2) // Small buffer size since each element has a lot of data.

3


cmd/bulkloader/loader.go, line 122 at r1 (raw file):

	// Shuffle concurrently with reduce.
	batchCh := make(chan []*protos.FlatPosting, 2) // Small buffer size since each element has a lot of data.

reduceCh


cmd/bulkloader/loader.go, line 126 at r1 (raw file):

	// Reduce stage.
	counter := make(chan struct{}, ld.opt.numGoroutines)

pending


cmd/bulkloader/progress.go, line 44 at r1 (raw file):

func (p *progress) reportOnce() {

remove v space.


cmd/bulkloader/write_flat.go, line 71 at r1 (raw file):

}

func readFlatFile(filename string, postingCh chan<- *protos.FlatPosting) {

readMapOutput(filename string, shuffleCh)


cmd/bulkloader/write_flat.go, line 90 at r1 (raw file):

		x.Check2(r.Discard(n))

		for len(unmarshalBuf) < int(sz) {

if cap(unmarshalBuf) < int(sz) { create slice of size sz }


cmd/bulkloader/write_flat.go, line 102 at r1 (raw file):

}

func shuffleFlatFiles(batchCh chan<- []*protos.FlatPosting,

shufflePostings


cmd/bulkloader/write_flat.go, line 139 at r1 (raw file):

type heapNode struct {
	head *protos.FlatPosting

posting


cmd/bulkloader/write_flat.go, line 144 at r1 (raw file):

type postingHeap struct {
	data []heapNode

nodes []heapNode


Comments from Reviewable

@peterstace
Copy link
Author

Review status: 5 of 8 files reviewed at latest revision, 13 unresolved discussions.


cmd/bulkloader/loader.go, line 37 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

mapOutput

Done.


cmd/bulkloader/loader.go, line 74 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

writeMapOutput

Done.


cmd/bulkloader/loader.go, line 113 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

shuffleInputChs

Done.


cmd/bulkloader/loader.go, line 114 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

No need for this.

Done.


cmd/bulkloader/loader.go, line 122 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

3

Done.


cmd/bulkloader/loader.go, line 122 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

reduceCh

Done.


cmd/bulkloader/loader.go, line 126 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

pending

Done.


cmd/bulkloader/progress.go, line 44 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

remove v space.

Done.


cmd/bulkloader/write_flat.go, line 71 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

readMapOutput(filename string, shuffleCh)

Done.


cmd/bulkloader/write_flat.go, line 90 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

if cap(unmarshalBuf) < int(sz) { create slice of size sz }

Done.


cmd/bulkloader/write_flat.go, line 102 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

shufflePostings

Done.


cmd/bulkloader/write_flat.go, line 139 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

posting

Done.


cmd/bulkloader/write_flat.go, line 144 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

nodes []heapNode

Done.


Comments from Reviewable

@manishrjain
Copy link
Contributor

:lgtm:


Reviewed 3 of 3 files at r2.
Review status: all files reviewed at latest revision, all discussions resolved.


Comments from Reviewable

@peterstace peterstace merged commit 4142a6a into master Sep 6, 2017
janardhan1993 pushed a commit that referenced this pull request Sep 6, 2017
@pawanrawal pawanrawal deleted the bulkloader_stage_2_rebased branch December 19, 2017 08:40
jarifibrahim pushed a commit that referenced this pull request Jul 11, 2020
This commit brings following new changes from badger
This commit also disable conflict detection in badger to save memory.

```
0dfb8b4 Changelog for v20.07.0 (#1411)
03ba278 Add missing changelog for v2.0.3 (#1410)
6001230 Revert "Compress/Encrypt Blocks in the background (#1227)" (#1409)
800305e Revert "Buffer pool for decompression (#1308)" (#1408)
63d9309 Revert "fix: Fix race condition in block.incRef (#1337)" (#1407)
e0d058c Revert "add assert to check integer overflow for table size (#1402)" (#1406)
d981f47 return error if the vlog writes exceeds more that 4GB. (#1400)
7f4e4b5 add assert to check integer overflow for table size (#1402)
8e896a7 Add a contribution guide (#1379)
b79aeef Avoid panic on multiple closer.Signal calls (#1401)
717b89c Enable cross-compiled 32bit tests on TravisCI (#1392)
09dfa66 Update ristretto to commit f66de99 (#1391)
509de73 Update head while replaying value log (#1372)
e013bfd Rework DB.DropPrefix (#1381)
3042e37 pre allocate cache key for the block cache and the bloom filter cache (#1371)
675efcd Increase default valueThreshold from 32B to 1KB (#1346)
158d927 Remove second initialization of writech in Open (#1382)
d37ce36 Tests: Use t.Parallel in TestIteratePrefix tests  (#1377)
3f4761d Force KeepL0InMemory to be true when InMemory is true (#1375)
dd332b0 Avoid panic in filltables() (#1365)
c45d966 Fix assert in background compression and encryption. (#1366)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants