Skip to content

Commit

Permalink
levels: Do not keep more than one table builder in memory
Browse files Browse the repository at this point in the history
`levelsController.compactBuildTables` writes tables to disk
asynchronously in the background by spawning a goroutine
per iteration of the loop.

This has two unfortunate consequences:

- There is a data race on the `builder` variable between
  iterations of the loop
- An unlimited number of table builders can be open in memory
  at a point in time, which results in high memory usage and
  high amount of memory garbage
  • Loading branch information
damz committed Jun 17, 2020
1 parent dd332b0 commit f284a8d
Showing 1 changed file with 38 additions and 46 deletions.
84 changes: 38 additions & 46 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,26 +513,24 @@ func (s *levelsController) compactBuildTables(
discardTs := s.kv.orc.discardAtOrBelow()

// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
dk, err := s.kv.registry.latestDataKey()
if err != nil {
return nil, nil,
y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")
}
resultCh := make(chan newTableResult)
var numBuilds, numVersions int
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
bopts.BfCache = s.kv.bfCache

var numVersions int
var lastKey, skipKey []byte
var vp valuePointer
newTables := make([]*table.Table, 0, 20)
var firstErr error
for it.Valid() {
timeStart := time.Now()
dk, err := s.kv.registry.latestDataKey()
if err != nil {
return nil, nil,
y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")
}
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
// Internal move keys related to the given prefix should also be skipped.
Expand Down Expand Up @@ -620,7 +618,14 @@ func (s *levelsController) compactBuildTables(
// called Add() at least once, and builder is not Empty().
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
numKeys, numSkips, time.Since(timeStart))
build := func(fileID uint64) (*table.Table, error) {

if builder.Empty() {
continue
}

fileID := s.reserveFileID()

build := func(fileID uint64, builder *table.Builder) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
Expand All @@ -633,35 +638,24 @@ func (s *levelsController) compactBuildTables(
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}
if builder.Empty() {
continue

var (
tbl *table.Table
err error
)
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID, builder)
}
numBuilds++
fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()
var (
tbl *table.Table
err error
)
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
resultCh <- newTableResult{tbl, err}
}(builder)
}

newTables := make([]*table.Table, 0, 20)
// Wait for all table builders to finish.
var firstErr error
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
firstErr = res.err
builder.Close()
if err != nil {
firstErr = err
break
}

newTables = append(newTables, tbl)
}

if firstErr == nil {
Expand All @@ -674,10 +668,8 @@ func (s *levelsController) compactBuildTables(
if firstErr != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
_ = newTables[j].DecrRef()
}
for _, table := range newTables {
_ = table.DecrRef()
}
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
return nil, nil, errorReturn
Expand Down

0 comments on commit f284a8d

Please sign in to comment.