diff --git a/go/store/blobstore/inmem.go b/go/store/blobstore/inmem.go index 73b59e56e5a..ca0173a564c 100644 --- a/go/store/blobstore/inmem.go +++ b/go/store/blobstore/inmem.go @@ -62,8 +62,8 @@ func (bs *InMemoryBlobstore) Path() string { // Get retrieves an io.reader for the portion of a blob specified by br along with // its version func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) { - bs.mutex.Lock() - defer bs.mutex.Unlock() + bs.mutex.RLock() + defer bs.mutex.RUnlock() if val, ok := bs.blobs[key]; ok { if ver, ok := bs.versions[key]; ok && ver != "" { @@ -114,6 +114,8 @@ func (bs *InMemoryBlobstore) CheckAndPut(ctx context.Context, expectedVersion, k // For InMemoryBlobstore instances error should never be returned (though other // implementations of this interface can) func (bs *InMemoryBlobstore) Exists(ctx context.Context, key string) (bool, error) { + bs.mutex.RLock() + defer bs.mutex.RUnlock() _, ok := bs.blobs[key] return ok, nil } diff --git a/go/store/nbs/conjoiner.go b/go/store/nbs/conjoiner.go index ad2a12ac61f..f2cec982ccf 100644 --- a/go/store/nbs/conjoiner.go +++ b/go/store/nbs/conjoiner.go @@ -28,8 +28,6 @@ import ( "time" "golang.org/x/sync/errgroup" - - "github.com/dolthub/dolt/go/store/hash" ) type conjoinStrategy interface { @@ -52,6 +50,7 @@ func (c inlineConjoiner) conjoinRequired(ts tableSet) bool { // chooseConjoinees implements conjoinStrategy. Current approach is to choose the smallest N tables which, // when removed and replaced with the conjoinment, will leave the conjoinment as the smallest table. +// We also keep taking table files until we get below maxTables. func (c inlineConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, keepers []tableSpec, err error) { sorted := make([]tableSpec, len(upstream)) copy(sorted, upstream) @@ -65,7 +64,9 @@ func (c inlineConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, kee for i < len(sorted) { next := sorted[i].chunkCount if sum <= next { - break + if c.maxTables == 0 || len(sorted)-i < c.maxTables { + break + } } sum += next i++ @@ -86,115 +87,162 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper return } -// conjoin attempts to use |p| to conjoin some number of tables referenced -// by |upstream|, allowing it to update |mm| with a new, smaller, set of tables -// that references precisely the same set of chunks. Conjoin() may not -// actually conjoin any upstream tables, usually because some out-of- -// process actor has already landed a conjoin of its own. Callers must -// handle this, likely by rebasing against upstream and re-evaluating the -// situation. -func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) { - var conjoined tableSpec - var conjoinees, keepers, appendixSpecs []tableSpec - var cleanup cleanupFunc +// A conjoinOperation is a multi-step process that a NomsBlockStore runs to +// conjoin the table files in the store. +// +// Conjoining the table files in a store involves copying all the data +// from |n| files into a single file, and replacing the entries for +// those table files in the manifest with the single, conjoin table +// file. Conjoining is a periodic maintanence operation which is +// automatically done against NomsBlockStores. +// +// Conjoining a lot of chunks across a number of table files can take +// a long time. On every manifest update, including every Commit, +// NomsBlockStore checks if the store needs conjoining. If it does, it +// starts an ansynchronous process which will create the new table +// file from the table files which have been chosen to be conjoined. +// This process will run in the background until the table file is +// created and in the right place. Then the conjoin finalization will +// take place. When finalizing a conjoin, the manifest contents of the +// store are updated. The conjoin only succeeds if all the table files +// which were conjoined are still in the manifest when we go to update +// it. Otherwise the conjoined table file is deleted and the store can +// try to create a new conjoined file if it is still necessary. +// +// A conjoinOperation is created when a conjoinStrategy |conjoinRequired| returns true. +type conjoinOperation struct { + // The computed things we conjoined in |conjoin|. + conjoinees []tableSpec + // The tableSpec for the conjoined file. + conjoined tableSpec + + // Anything to run as cleanup after we complete successfully. + // This comes directly from persister.ConjoinAll, but needs to + // be run after the manifest update lands successfully. + cleanup cleanupFunc +} - for { - if conjoinees == nil { - // Appendix table files should never be conjoined - // so we remove them before conjoining and add them - // back after - if upstream.NumAppendixSpecs() != 0 { - upstream, appendixSpecs = upstream.removeAppendixSpecs() - } +// Compute what we will conjoin and prepare to do it. This should be +// done synchronously and with the Mutex held by NomsBlockStore. +func (op *conjoinOperation) prepareConjoin(ctx context.Context, strat conjoinStrategy, upstream manifestContents) error { + if upstream.NumAppendixSpecs() != 0 { + upstream, _ = upstream.removeAppendixSpecs() + } + var err error + op.conjoinees, _, err = strat.chooseConjoinees(upstream.specs) + if err != nil { + return err + } + return nil +} - var err error - conjoinees, keepers, err = s.chooseConjoinees(upstream.specs) - if err != nil { - return manifestContents{}, nil, err - } +// Actually runs persister.ConjoinAll, after conjoinees are chosen by +// |prepareConjoin|. This should be done asynchronously by +// NomsBlockStore. +func (op *conjoinOperation) conjoin(ctx context.Context, persister tablePersister, stats *Stats) error { + var err error + op.conjoined, op.cleanup, err = conjoinTables(ctx, op.conjoinees, persister, stats) + if err != nil { + return err + } + return nil +} - conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats) - if err != nil { - return manifestContents{}, nil, err +// Land the update in the conjoin result in the manifest as an update +// which removes the conjoinees and adds the conjoined. Only updates +// the manifest by adding the conjoined file if all conjoinees are +// still present in the manifest. +// +// Whether the conjoined file lands or not, this returns a nil error +// if it runs to completion successfully and it returns a cleanupFunc +// which should be run. +func (op *conjoinOperation) updateManifest(ctx context.Context, upstream manifestContents, mm manifestUpdater, stats *Stats) (manifestContents, cleanupFunc, error) { + conjoineeSet := toSpecSet(op.conjoinees) + for { + upstreamSet := toSpecSet(upstream.specs) + canApply := true + alreadyApplied := false + for h := range conjoineeSet { + if _, ok := upstreamSet[h]; !ok { + canApply = false + break } } - - specs := append(make([]tableSpec, 0, len(keepers)+1), conjoined) - if len(appendixSpecs) > 0 { - specs = append(make([]tableSpec, 0, len(specs)+len(appendixSpecs)), appendixSpecs...) - specs = append(specs, conjoined) - } - - specs = append(specs, keepers...) - - newContents := manifestContents{ - nbfVers: upstream.nbfVers, - root: upstream.root, - lock: generateLockHash(upstream.root, specs, appendixSpecs, nil), - gcGen: upstream.gcGen, - specs: specs, - appendix: appendixSpecs, - } - - var err error - upstream, err = mm.Update(ctx, upstream.lock, newContents, stats, nil) - if err != nil { - return manifestContents{}, nil, err - } - - if newContents.lock == upstream.lock { - return upstream, cleanup, nil - } - - // Optimistic lock failure. Someone else moved to the root, the - // set of tables, or both out from under us. If we can re-use - // the conjoin we already performed, we want to try again. - // Currently, we will only do so if ALL conjoinees are still - // present upstream. If we can't re-use...then someone else - // almost certainly landed a conjoin upstream. In this case, - // bail and let clients ask again if they think they still - // can't proceed. - - // If the appendix has changed we simply bail - // and let the client retry - if len(appendixSpecs) > 0 { - if len(upstream.appendix) != len(appendixSpecs) { - return upstream, func() {}, nil - } - for i := range upstream.appendix { - if upstream.appendix[i].name != appendixSpecs[i].name { - return upstream, func() {}, nil + if canApply { + newSpecs := make([]tableSpec, len(upstream.specs)-len(conjoineeSet)+1) + ins := 0 + for i, s := range upstream.specs { + if _, ok := conjoineeSet[s.name]; !ok { + newSpecs[ins] = s + ins += 1 } + if i == len(upstream.appendix) { + newSpecs[ins] = op.conjoined + ins += 1 + } + } + newContents := manifestContents{ + nbfVers: upstream.nbfVers, + root: upstream.root, + lock: generateLockHash(upstream.root, newSpecs, upstream.appendix, nil), + gcGen: upstream.gcGen, + specs: newSpecs, + appendix: upstream.appendix, } - // No appendix change occurred, so we remove the appendix - // on the "latest" upstream which will be added back - // before the conjoin completes - upstream, appendixSpecs = upstream.removeAppendixSpecs() - } + updated, err := mm.Update(ctx, upstream.lock, newContents, stats, nil) + if err != nil { + return manifestContents{}, func() {}, err + } - conjoineeSet := map[hash.Hash]struct{}{} - upstreamNames := map[hash.Hash]struct{}{} - for _, spec := range upstream.specs { - upstreamNames[spec.name] = struct{}{} - } - for _, c := range conjoinees { - if _, present := upstreamNames[c.name]; !present { - return upstream, func() {}, nil // Bail! + if newContents.lock == updated.lock { + return updated, op.cleanup, nil } - conjoineeSet[c.name] = struct{}{} - } - // Filter conjoinees out of upstream.specs to generate new set of keepers - keepers = make([]tableSpec, 0, len(upstream.specs)-len(conjoinees)) - for _, spec := range upstream.specs { - if _, present := conjoineeSet[spec.name]; !present { - keepers = append(keepers, spec) + // Go back around the loop, trying to apply against the new upstream. + upstream = updated + } else { + if _, ok := upstreamSet[op.conjoined.name]; ok { + alreadyApplied = true + } + if !alreadyApplied { + // In theory we could delete the conjoined + // table file here, since its conjoinees are + // no longer in the manifest and it itself is + // not in the manifest either. + // + // tablePersister does not expose a + // functionality to prune it, and it will get + // picked up by GC anyway, so we do not do + // that here. + return upstream, func() {}, nil + } else { + return upstream, func() {}, nil } } } } +// conjoin attempts to use |p| to conjoin some number of tables referenced +// by |upstream|, allowing it to update |mm| with a new, smaller, set of tables +// that references precisely the same set of chunks. Conjoin() may not +// actually conjoin any upstream tables, usually because some out-of- +// process actor has already landed a conjoin of its own. Callers must +// handle this, likely by rebasing against upstream and re-evaluating the +// situation. +func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, cleanupFunc, error) { + var op conjoinOperation + err := op.prepareConjoin(ctx, s, upstream) + if err != nil { + return manifestContents{}, nil, err + } + err = op.conjoin(ctx, p, stats) + if err != nil { + return manifestContents{}, nil, err + } + return op.updateManifest(ctx, upstream, mm, stats) +} + func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, cleanup cleanupFunc, err error) { eg, ectx := errgroup.WithContext(ctx) toConjoin := make(chunkSources, len(conjoinees)) diff --git a/go/store/nbs/stats_test.go b/go/store/nbs/stats_test.go index 17e1aad0ee8..ef8326d2f15 100644 --- a/go/store/nbs/stats_test.go +++ b/go/store/nbs/stats_test.go @@ -147,6 +147,16 @@ func TestStats(t *testing.T) { _, err = store.Commit(context.Background(), h, h) require.NoError(t, err) + waitForConjoin(store) + assert.Equal(uint64(1), stats(store).ConjoinLatency.Samples()) // TODO: Once random conjoin hack is out, test other conjoin stats } + +func waitForConjoin(nbs *NomsBlockStore) { + nbs.mu.Lock() + defer nbs.mu.Unlock() + for nbs.conjoinOp != nil { + nbs.conjoinOpCond.Wait() + } +} diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 7867ca52393..1041451f4f5 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -109,6 +109,9 @@ type NomsBlockStore struct { tables tableSet upstream manifestContents + conjoinOp *conjoinOperation + conjoinOpCond *sync.Cond + // Guarded by |mu|. Notified on gcInProgress and gcOutstandingReads changes. // Used to implement |waitForGC|. gcCond *sync.Cond @@ -269,34 +272,76 @@ func (nbs *NomsBlockStore) handleUnlockedRead(ctx context.Context, gcb gcBehavio } } -func (nbs *NomsBlockStore) conjoinIfRequired(ctx context.Context) (bool, error) { +func (nbs *NomsBlockStore) startConjoinIfRequired(ctx context.Context) error { + if nbs.conjoinOp != nil { + return nil + } if nbs.conjoiner.conjoinRequired(nbs.tables) { nbs.logger.WithField("upstream_len", len(nbs.tables.upstream)).Info("beginning conjoin of database") - newUpstream, cleanup, err := conjoin(ctx, nbs.conjoiner, nbs.upstream, nbs.manifestMgr, nbs.persister, nbs.stats) + var op = &conjoinOperation{} + err := op.prepareConjoin(ctx, nbs.conjoiner, nbs.upstream) if err != nil { - nbs.logger.WithError(err).Info("conjoin of database failed") - return false, err + return err } + nbs.conjoinOp = op + go func(ctx context.Context) { + // We use context.Background(), since this context will outlive the caller + // and it does not access NomsBlockStore storage directly, instead operating + // only on tablePersister and manifestUpdater. + err := op.conjoin(ctx, nbs.persister, nbs.stats) + nbs.finalizeConjoin(ctx, err) + }(context.Background()) + } + return nil +} - newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nil, nbs.stats) - if err != nil { - nbs.logger.WithError(err).Info("during conjoin, updating database with new table files failed") - return false, err - } +// Called in an asynchronous context from the goroutine that |startConjoinIfRequired| kicks off. +// +// Responsible for calling conjoinOp.updateManifest under lock and dealing with its results. +func (nbs *NomsBlockStore) finalizeConjoin(ctx context.Context, err error) { + nbs.mu.Lock() + defer nbs.mu.Unlock() - nbs.upstream = newUpstream - oldTables := nbs.tables - nbs.tables = newTables - nbs.logger.WithField("new_upstream_len", len(nbs.tables.upstream)).Info("conjoin completed successfully") - err = oldTables.close() + defer func() { + nbs.conjoinOp = nil + nbs.conjoinOpCond.Broadcast() + }() + + if err != nil { + nbs.logger.WithError(err).Warn("conjoin of database failed with error") + return + } + + nbs.manifestMgr.LockForUpdate() + defer func() { + err := nbs.manifestMgr.UnlockForUpdate() if err != nil { - return true, err + nbs.logger.WithError(err).Warn("during conjoin, unlocking manifest manager for update failed with error") } - cleanup() - return true, nil - } else { - return false, nil + }() + + newUpstream, cleanup, err := nbs.conjoinOp.updateManifest(ctx, nbs.upstream, nbs.manifestMgr, nbs.stats) + if err != nil { + nbs.logger.WithError(err).Warn("during conjoin, updating database manifest with new table files failed") + } + + newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nil, nbs.stats) + if err != nil { + nbs.logger.WithError(err).Warn("during conjoin, updating database with new table files failed") + return } + + nbs.upstream = newUpstream + oldTables := nbs.tables + nbs.tables = newTables + nbs.logger.WithField("new_upstream_len", len(nbs.tables.upstream)).Info("conjoin completed successfully") + err = oldTables.close() + if err != nil { + nbs.logger.WithError(err).Warn("during conjoin, closing old table files failed with error") + return + } + + cleanup() } func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (ManifestInfo, error) { @@ -321,7 +366,7 @@ func (nbs *NomsBlockStore) updateManifestAddFiles(ctx context.Context, updates m err = errors.Join(err, nbs.manifestMgr.UnlockForUpdate()) }() - _, err = nbs.conjoinIfRequired(ctx) + err = nbs.startConjoinIfRequired(ctx) if err != nil { return manifestContents{}, false, err } @@ -655,6 +700,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager logger: logrus.StandardLogger().WithField("pkg", "store.noms"), } nbs.gcCond = sync.NewCond(&nbs.mu) + nbs.conjoinOpCond = sync.NewCond(&nbs.mu) t1 := time.Now() defer nbs.stats.OpenLatency.SampleTimeSince(t1) @@ -1376,13 +1422,10 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has break } - didConjoin, err := nbs.conjoinIfRequired(ctx) + err := nbs.startConjoinIfRequired(ctx) if err != nil { return err } - if didConjoin { - return errOptimisticLockFailedTables - } // check for dangling reference to the new root if err = nbs.errorIfDangling(current, checker); err != nil { @@ -1453,17 +1496,16 @@ func (nbs *NomsBlockStore) AccessMode() chunks.ExclusiveAccessMode { return nbs.persister.AccessMode() } -func (nbs *NomsBlockStore) Close() (err error) { - if cerr := nbs.persister.Close(); cerr != nil { - err = cerr - } - if cerr := nbs.tables.close(); cerr != nil { - err = cerr - } - if cerr := nbs.manifestMgr.Close(); cerr != nil { - err = cerr +func (nbs *NomsBlockStore) Close() error { + nbs.mu.Lock() + defer nbs.mu.Unlock() + for nbs.conjoinOp != nil { + nbs.conjoinOpCond.Wait() } - return + err := nbs.persister.Close() + err = errors.Join(err, nbs.tables.close()) + err = errors.Join(err, nbs.manifestMgr.Close()) + return err } func (nbs *NomsBlockStore) Stats() interface{} { @@ -1849,8 +1891,12 @@ func (nbs *NomsBlockStore) pruneTableFiles(ctx context.Context) (err error) { } func (nbs *NomsBlockStore) BeginGC(keeper func(hash.Hash) bool, _ chunks.GCMode) error { - nbs.gcCond.L.Lock() - defer nbs.gcCond.L.Unlock() + nbs.mu.Lock() + defer nbs.mu.Unlock() + // Block until there is no ongoing conjoin... + for nbs.conjoinOp != nil { + nbs.conjoinOpCond.Wait() + } if nbs.gcInProgress { return errors.New("gc already in progress") } diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 7a3670df1e2..2eb9d2b82d4 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -213,10 +213,13 @@ func TestNBSPruneTableFiles(t *testing.T) { }, st.refCheck) require.NoError(t, err) require.True(t, ok) + ok, err = st.Commit(ctx, st.upstream.root, st.upstream.root) require.True(t, ok) require.NoError(t, err) + waitForConjoin(st) + _, sources, _, err := st.Sources(ctx) require.NoError(t, err) assert.Greater(t, numTableFiles, len(sources))