Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions go/store/blobstore/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (bs *InMemoryBlobstore) Path() string {
// Get retrieves an io.reader for the portion of a blob specified by br along with
// its version
func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
bs.mutex.Lock()
defer bs.mutex.Unlock()
bs.mutex.RLock()
defer bs.mutex.RUnlock()

if val, ok := bs.blobs[key]; ok {
if ver, ok := bs.versions[key]; ok && ver != "" {
Expand Down Expand Up @@ -114,6 +114,8 @@ func (bs *InMemoryBlobstore) CheckAndPut(ctx context.Context, expectedVersion, k
// For InMemoryBlobstore instances error should never be returned (though other
// implementations of this interface can)
func (bs *InMemoryBlobstore) Exists(ctx context.Context, key string) (bool, error) {
bs.mutex.RLock()
defer bs.mutex.RUnlock()
_, ok := bs.blobs[key]
return ok, nil
}
Expand Down
240 changes: 144 additions & 96 deletions go/store/nbs/conjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"time"

"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/store/hash"
)

type conjoinStrategy interface {
Expand All @@ -52,6 +50,7 @@ func (c inlineConjoiner) conjoinRequired(ts tableSet) bool {

// chooseConjoinees implements conjoinStrategy. Current approach is to choose the smallest N tables which,
// when removed and replaced with the conjoinment, will leave the conjoinment as the smallest table.
// We also keep taking table files until we get below maxTables.
func (c inlineConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, keepers []tableSpec, err error) {
sorted := make([]tableSpec, len(upstream))
copy(sorted, upstream)
Expand All @@ -65,7 +64,9 @@ func (c inlineConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, kee
for i < len(sorted) {
next := sorted[i].chunkCount
if sum <= next {
break
if c.maxTables == 0 || len(sorted)-i < c.maxTables {
break
}
}
sum += next
i++
Expand All @@ -86,115 +87,162 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
return
}

// conjoin attempts to use |p| to conjoin some number of tables referenced
// by |upstream|, allowing it to update |mm| with a new, smaller, set of tables
// that references precisely the same set of chunks. Conjoin() may not
// actually conjoin any upstream tables, usually because some out-of-
// process actor has already landed a conjoin of its own. Callers must
// handle this, likely by rebasing against upstream and re-evaluating the
// situation.
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) {
var conjoined tableSpec
var conjoinees, keepers, appendixSpecs []tableSpec
var cleanup cleanupFunc
// A conjoinOperation is a multi-step process that a NomsBlockStore runs to
// conjoin the table files in the store.
//
// Conjoining the table files in a store involves copying all the data
// from |n| files into a single file, and replacing the entries for
// those table files in the manifest with the single, conjoin table
// file. Conjoining is a periodic maintanence operation which is
// automatically done against NomsBlockStores.
//
// Conjoining a lot of chunks across a number of table files can take
// a long time. On every manifest update, including every Commit,
// NomsBlockStore checks if the store needs conjoining. If it does, it
// starts an ansynchronous process which will create the new table
// file from the table files which have been chosen to be conjoined.
// This process will run in the background until the table file is
// created and in the right place. Then the conjoin finalization will
// take place. When finalizing a conjoin, the manifest contents of the
// store are updated. The conjoin only succeeds if all the table files
// which were conjoined are still in the manifest when we go to update
// it. Otherwise the conjoined table file is deleted and the store can
// try to create a new conjoined file if it is still necessary.
//
// A conjoinOperation is created when a conjoinStrategy |conjoinRequired| returns true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are very detailed docstrings. Thank you!

type conjoinOperation struct {
// The computed things we conjoined in |conjoin|.
conjoinees []tableSpec
// The tableSpec for the conjoined file.
conjoined tableSpec

// Anything to run as cleanup after we complete successfully.
// This comes directly from persister.ConjoinAll, but needs to
// be run after the manifest update lands successfully.
cleanup cleanupFunc
}

for {
if conjoinees == nil {
// Appendix table files should never be conjoined
// so we remove them before conjoining and add them
// back after
if upstream.NumAppendixSpecs() != 0 {
upstream, appendixSpecs = upstream.removeAppendixSpecs()
}
// Compute what we will conjoin and prepare to do it. This should be
// done synchronously and with the Mutex held by NomsBlockStore.
func (op *conjoinOperation) prepareConjoin(ctx context.Context, strat conjoinStrategy, upstream manifestContents) error {
if upstream.NumAppendixSpecs() != 0 {
upstream, _ = upstream.removeAppendixSpecs()
}
var err error
op.conjoinees, _, err = strat.chooseConjoinees(upstream.specs)
if err != nil {
return err
}
return nil
}

var err error
conjoinees, keepers, err = s.chooseConjoinees(upstream.specs)
if err != nil {
return manifestContents{}, nil, err
}
// Actually runs persister.ConjoinAll, after conjoinees are chosen by
// |prepareConjoin|. This should be done asynchronously by
// NomsBlockStore.
func (op *conjoinOperation) conjoin(ctx context.Context, persister tablePersister, stats *Stats) error {
var err error
op.conjoined, op.cleanup, err = conjoinTables(ctx, op.conjoinees, persister, stats)
if err != nil {
return err
}
return nil
}

conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats)
if err != nil {
return manifestContents{}, nil, err
// Land the update in the conjoin result in the manifest as an update
// which removes the conjoinees and adds the conjoined. Only updates
// the manifest by adding the conjoined file if all conjoinees are
// still present in the manifest.
//
// Whether the conjoined file lands or not, this returns a nil error
// if it runs to completion successfully and it returns a cleanupFunc
// which should be run.
func (op *conjoinOperation) updateManifest(ctx context.Context, upstream manifestContents, mm manifestUpdater, stats *Stats) (manifestContents, cleanupFunc, error) {
conjoineeSet := toSpecSet(op.conjoinees)
for {
upstreamSet := toSpecSet(upstream.specs)
canApply := true
alreadyApplied := false
for h := range conjoineeSet {
if _, ok := upstreamSet[h]; !ok {
canApply = false
break
}
}

specs := append(make([]tableSpec, 0, len(keepers)+1), conjoined)
if len(appendixSpecs) > 0 {
specs = append(make([]tableSpec, 0, len(specs)+len(appendixSpecs)), appendixSpecs...)
specs = append(specs, conjoined)
}

specs = append(specs, keepers...)

newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, specs, appendixSpecs, nil),
gcGen: upstream.gcGen,
specs: specs,
appendix: appendixSpecs,
}

var err error
upstream, err = mm.Update(ctx, upstream.lock, newContents, stats, nil)
if err != nil {
return manifestContents{}, nil, err
}

if newContents.lock == upstream.lock {
return upstream, cleanup, nil
}

// Optimistic lock failure. Someone else moved to the root, the
// set of tables, or both out from under us. If we can re-use
// the conjoin we already performed, we want to try again.
// Currently, we will only do so if ALL conjoinees are still
// present upstream. If we can't re-use...then someone else
// almost certainly landed a conjoin upstream. In this case,
// bail and let clients ask again if they think they still
// can't proceed.

// If the appendix has changed we simply bail
// and let the client retry
if len(appendixSpecs) > 0 {
if len(upstream.appendix) != len(appendixSpecs) {
return upstream, func() {}, nil
}
for i := range upstream.appendix {
if upstream.appendix[i].name != appendixSpecs[i].name {
return upstream, func() {}, nil
if canApply {
newSpecs := make([]tableSpec, len(upstream.specs)-len(conjoineeSet)+1)
ins := 0
for i, s := range upstream.specs {
if _, ok := conjoineeSet[s.name]; !ok {
newSpecs[ins] = s
ins += 1
}
if i == len(upstream.appendix) {
newSpecs[ins] = op.conjoined
ins += 1
}
}
newContents := manifestContents{
nbfVers: upstream.nbfVers,
root: upstream.root,
lock: generateLockHash(upstream.root, newSpecs, upstream.appendix, nil),
gcGen: upstream.gcGen,
specs: newSpecs,
appendix: upstream.appendix,
}

// No appendix change occurred, so we remove the appendix
// on the "latest" upstream which will be added back
// before the conjoin completes
upstream, appendixSpecs = upstream.removeAppendixSpecs()
}
updated, err := mm.Update(ctx, upstream.lock, newContents, stats, nil)
if err != nil {
return manifestContents{}, func() {}, err
}

conjoineeSet := map[hash.Hash]struct{}{}
upstreamNames := map[hash.Hash]struct{}{}
for _, spec := range upstream.specs {
upstreamNames[spec.name] = struct{}{}
}
for _, c := range conjoinees {
if _, present := upstreamNames[c.name]; !present {
return upstream, func() {}, nil // Bail!
if newContents.lock == updated.lock {
return updated, op.cleanup, nil
}
conjoineeSet[c.name] = struct{}{}
}

// Filter conjoinees out of upstream.specs to generate new set of keepers
keepers = make([]tableSpec, 0, len(upstream.specs)-len(conjoinees))
for _, spec := range upstream.specs {
if _, present := conjoineeSet[spec.name]; !present {
keepers = append(keepers, spec)
// Go back around the loop, trying to apply against the new upstream.
upstream = updated
} else {
if _, ok := upstreamSet[op.conjoined.name]; ok {
alreadyApplied = true
}
if !alreadyApplied {
// In theory we could delete the conjoined
// table file here, since its conjoinees are
// no longer in the manifest and it itself is
// not in the manifest either.
//
// tablePersister does not expose a
// functionality to prune it, and it will get
// picked up by GC anyway, so we do not do
// that here.
return upstream, func() {}, nil
} else {
return upstream, func() {}, nil
}
}
}
}

// conjoin attempts to use |p| to conjoin some number of tables referenced
// by |upstream|, allowing it to update |mm| with a new, smaller, set of tables
// that references precisely the same set of chunks. Conjoin() may not
// actually conjoin any upstream tables, usually because some out-of-
// process actor has already landed a conjoin of its own. Callers must
// handle this, likely by rebasing against upstream and re-evaluating the
// situation.
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) {
var op conjoinOperation
err := op.prepareConjoin(ctx, s, upstream)
if err != nil {
return manifestContents{}, nil, err
}
err = op.conjoin(ctx, p, stats)
if err != nil {
return manifestContents{}, nil, err
}
return op.updateManifest(ctx, upstream, mm, stats)
}

func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, cleanup cleanupFunc, err error) {
eg, ectx := errgroup.WithContext(ctx)
toConjoin := make(chunkSources, len(conjoinees))
Expand Down
10 changes: 10 additions & 0 deletions go/store/nbs/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ func TestStats(t *testing.T) {
_, err = store.Commit(context.Background(), h, h)
require.NoError(t, err)

waitForConjoin(store)

assert.Equal(uint64(1), stats(store).ConjoinLatency.Samples())
// TODO: Once random conjoin hack is out, test other conjoin stats
}

func waitForConjoin(nbs *NomsBlockStore) {
nbs.mu.Lock()
defer nbs.mu.Unlock()
for nbs.conjoinOp != nil {
nbs.conjoinOpCond.Wait()
}
}
Loading
Loading