Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

levels: Do not keep more than one table builder in memory #1373

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 38 additions & 46 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,26 +513,24 @@ func (s *levelsController) compactBuildTables(
discardTs := s.kv.orc.discardAtOrBelow()

// Start generating new tables.
type newTableResult struct {
table *table.Table
err error
dk, err := s.kv.registry.latestDataKey()
if err != nil {
return nil, nil,
y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")
}
resultCh := make(chan newTableResult)
var numBuilds, numVersions int
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
bopts.BfCache = s.kv.bfCache

var numVersions int
var lastKey, skipKey []byte
var vp valuePointer
newTables := make([]*table.Table, 0, 20)
var firstErr error
for it.Valid() {
timeStart := time.Now()
dk, err := s.kv.registry.latestDataKey()
if err != nil {
return nil, nil,
y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")
}
bopts := buildTableOptions(s.kv.opt)
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
// Internal move keys related to the given prefix should also be skipped.
Expand Down Expand Up @@ -620,7 +618,14 @@ func (s *levelsController) compactBuildTables(
// called Add() at least once, and builder is not Empty().
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
numKeys, numSkips, time.Since(timeStart))
build := func(fileID uint64) (*table.Table, error) {

if builder.Empty() {
continue
}

fileID := s.reserveFileID()

build := func(fileID uint64, builder *table.Builder) (*table.Table, error) {
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
if err != nil {
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
Expand All @@ -633,35 +638,24 @@ func (s *levelsController) compactBuildTables(
// decrRef is added below.
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
}
if builder.Empty() {
continue

var (
tbl *table.Table
err error
)
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID, builder)
}
numBuilds++
fileID := s.reserveFileID()
go func(builder *table.Builder) {
defer builder.Close()
var (
tbl *table.Table
err error
)
if s.kv.opt.InMemory {
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
} else {
tbl, err = build(fileID)
}
resultCh <- newTableResult{tbl, err}
}(builder)
}

newTables := make([]*table.Table, 0, 20)
// Wait for all table builders to finish.
var firstErr error
for x := 0; x < numBuilds; x++ {
res := <-resultCh
newTables = append(newTables, res.table)
if firstErr == nil {
firstErr = res.err
builder.Close()
if err != nil {
firstErr = err
break
}

newTables = append(newTables, tbl)
}

if firstErr == nil {
Expand All @@ -674,10 +668,8 @@ func (s *levelsController) compactBuildTables(
if firstErr != nil {
// An error happened. Delete all the newly created table files (by calling DecrRef
// -- we're the only holders of a ref).
for j := 0; j < numBuilds; j++ {
if newTables[j] != nil {
_ = newTables[j].DecrRef()
}
for _, table := range newTables {
_ = table.DecrRef()
}
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
return nil, nil, errorReturn
Expand Down