diff --git a/go/cmd/dolt/cli/arg_parser_helpers.go b/go/cmd/dolt/cli/arg_parser_helpers.go index 4057974fbf2..dba70b3c1b9 100644 --- a/go/cmd/dolt/cli/arg_parser_helpers.go +++ b/go/cmd/dolt/cli/arg_parser_helpers.go @@ -294,6 +294,7 @@ func CreateLogArgParser(isTableFunction bool) *argparser.ArgParser { func CreateGCArgParser() *argparser.ArgParser { ap := argparser.NewArgParserWithMaxArgs("gc", 0) ap.SupportsFlag(ShallowFlag, "s", "perform a fast, but incomplete garbage collection pass") + ap.SupportsFlag(FullFlag, "f", "perform a full garbage collection, including the old generation") return ap } diff --git a/go/cmd/dolt/cli/flags.go b/go/cmd/dolt/cli/flags.go index e1602e93b3a..13f72390853 100644 --- a/go/cmd/dolt/cli/flags.go +++ b/go/cmd/dolt/cli/flags.go @@ -37,6 +37,7 @@ const ( DryRunFlag = "dry-run" EmptyParam = "empty" ForceFlag = "force" + FullFlag = "full" GraphFlag = "graph" HardResetParam = "hard" HostFlag = "host" diff --git a/go/cmd/dolt/commands/gc.go b/go/cmd/dolt/commands/gc.go index c99000fbdd4..96364d01d4d 100644 --- a/go/cmd/dolt/commands/gc.go +++ b/go/cmd/dolt/commands/gc.go @@ -33,9 +33,17 @@ var gcDocs = cli.CommandDocumentationContent{ ShortDesc: "Cleans up unreferenced data from the repository.", LongDesc: `Searches the repository for data that is no longer referenced and no longer needed. -If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed.`, +Dolt GC is generational. When a GC is run, everything reachable from any commit on any branch +is put into the old generation. Data which is only reachable from uncommited branch HEADs is kept in +the new generation. By default, Dolt GC will only visit data in the new generation, and so will never +collect data from deleted branches which has previously made its way to the old generation from being +copied during a prior garbage collection. + +If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed. + +If the {{.EmphasisLeft}}--full{{.EmphasisRight}} flag is supplied, a more thorough garbage collection, fully collecting the old gen and new gen, will be performed.`, Synopsis: []string{ - "[--shallow]", + "[--shallow|--full]", }, } @@ -83,6 +91,10 @@ func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, arg help, usage := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, gcDocs, ap)) apr := cli.ParseArgsOrDie(ap, args, help) + if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) { + return HandleVErrAndExitCode(errhand.BuildDError("Invalid Argument: --shallow is not compatible with --full").SetPrintUsage().Build(), usage) + } + queryist, sqlCtx, closeFunc, err := cliCtx.QueryEngine(ctx) if err != nil { return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage) @@ -110,6 +122,9 @@ func constructDoltGCQuery(apr *argparser.ArgParseResults) (string, error) { if apr.Contains(cli.ShallowFlag) { query += "'--shallow'" } + if apr.Contains(cli.FullFlag) { + query += "'--full'" + } query += ")" return query, nil } diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index a3dfff801e6..5953db4d03d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1633,7 +1633,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error { // until no possibly-stale ChunkStore state is retained in memory, or failing // certain in-progress operations which cannot be finalized in a timely manner, // etc. -func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error { +func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error { collector, ok := ddb.db.Database.(datas.GarbageCollector) if !ok { return fmt.Errorf("this database does not support garbage collection") @@ -1677,7 +1677,7 @@ func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error { return err } - return collector.GC(ctx, oldGen, newGen, safepointF) + return collector.GC(ctx, mode, oldGen, newGen, safepointF) } func (ddb *DoltDB) ShallowGC(ctx context.Context) error { diff --git a/go/libraries/doltcore/doltdb/gc_test.go b/go/libraries/doltcore/doltdb/gc_test.go index f99b30d0a73..343c3213fbe 100644 --- a/go/libraries/doltcore/doltdb/gc_test.go +++ b/go/libraries/doltcore/doltdb/gc_test.go @@ -139,7 +139,7 @@ func testGarbageCollection(t *testing.T, test gcTest) { } } - err := dEnv.DoltDB.GC(ctx, nil) + err := dEnv.DoltDB.GC(ctx, types.GCModeDefault, nil) require.NoError(t, err) test.postGCFunc(ctx, t, dEnv.DoltDB, res) @@ -208,7 +208,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) { _, err = ns.Write(ctx, c1.Node()) require.NoError(t, err) - err = ddb.GC(ctx, nil) + err = ddb.GC(ctx, types.GCModeDefault, nil) require.NoError(t, err) c2 := newIntMap(t, ctx, ns, 2, 2) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 44ee99f7e21..4a28a3fe00c 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/types" ) const ( @@ -81,6 +82,10 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdFailure, fmt.Errorf("Could not load database %s", dbName) } + if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) { + return cmdFailure, fmt.Errorf("cannot supply both --shallow and --full to dolt_gc: %w", InvalidArgErr) + } + if apr.Contains(cli.ShallowFlag) { err = ddb.ShallowGC(ctx) if err != nil { @@ -106,10 +111,15 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { origepoch = epoch.(int) } + var mode types.GCMode = types.GCModeDefault + if apr.Contains(cli.FullFlag) { + mode = types.GCModeFull + } + // TODO: If we got a callback at the beginning and an // (allowed-to-block) callback at the end, we could more // gracefully tear things down. - err = ddb.GC(ctx, func() error { + err = ddb.GC(ctx, mode, func() error { if origepoch != -1 { // Here we need to sanity check role and epoch. if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index e1bed5c74c9..a074529ff7e 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -152,6 +152,34 @@ type LoggingChunkStore interface { var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block") +// The function type for ChunkStore.HasMany. Used as a return value in the +// GCFinalizer interface. +type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) + +// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed. +// +// A GCFinalizer is a handle to one or more table files which has been +// constructed as part of the GC process. It can be used to add the table files +// to the existing store, as we do in the case of a default-mode collection +// into the old gen, and it can be used to replace all existing table files in +// the store with the new table files, as we do in the collection into the new +// gen. +// +// In addition, adding the table files to an existing store exposes a HasMany +// implementation which inspects only the table files that were added, not all +// the table files in the resulting store. This is an important part of the +// full gc protocol, which works as follows: +// +// * Collect everything reachable from old gen refs into a new table file in the old gen. +// * Add the new table file to the old gen. +// * Collect everything reachable from new gen refs into the new gen, skipping stuff that is in the new old gen table file. +// * Swap to the new gen table file. +// * Swap to the old gen table file. +type GCFinalizer interface { + AddChunksToStore(ctx context.Context) (HasManyFunc, error) + SwapChunksInStore(ctx context.Context) error +} + // ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection. type ChunkStoreGarbageCollector interface { ChunkStore @@ -185,7 +213,7 @@ type ChunkStoreGarbageCollector interface { // This behavior is a little different for ValueStore.GC()'s // interactions with generational stores. See ValueStore and // NomsBlockStore/GenerationalNBS for details. - MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error + MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) // Count returns the number of chunks in the store. Count() (uint32, error) diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 485eafea4ba..2af154312f7 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -343,7 +343,24 @@ func (ms *MemoryStoreView) EndGC() { ms.transitionToNoGC() } -func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error { +type msvGcFinalizer struct { + ms *MemoryStoreView + keepers map[hash.Hash]Chunk +} + +func (mgcf msvGcFinalizer) AddChunksToStore(ctx context.Context) (HasManyFunc, error) { + panic("unsupported") +} + +func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error { + mgcf.ms.mu.Lock() + defer mgcf.ms.mu.Unlock() + mgcf.ms.storage = &MemoryStorage{rootHash: mgcf.ms.rootHash, data: mgcf.keepers} + mgcf.ms.pending = map[hash.Hash]Chunk{} + return nil +} + +func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) { if dest != ms { panic("unsupported") } @@ -366,20 +383,16 @@ LOOP: for _, h := range hs { c, err := ms.Get(ctx, h) if err != nil { - return err + return nil, err } keepers[h] = c } case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() } } - ms.mu.Lock() - defer ms.mu.Unlock() - ms.storage = &MemoryStorage{rootHash: ms.rootHash, data: keepers} - ms.pending = map[hash.Hash]Chunk{} - return nil + return msvGcFinalizer{ms, keepers}, nil } func (ms *MemoryStoreView) Count() (uint32, error) { diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index 037210f3c7c..cf410c6afbc 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -91,10 +91,10 @@ func (s *TestStoreView) EndGC() { collector.EndGC() } -func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error { +func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) if !ok || dest != s { - return ErrUnsupportedOperation + return nil, ErrUnsupportedOperation } return collector.MarkAndSweepChunks(ctx, hashes, collector) } diff --git a/go/store/datas/database.go b/go/store/datas/database.go index 56559d48a30..5b3913d0a2e 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -194,7 +194,7 @@ type GarbageCollector interface { // GC traverses the database starting at the Root and removes // all unreferenced data from persistent storage. - GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error + GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error } // CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index 8712f1bdfed..48075f99a5d 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -1148,8 +1148,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse } // GC traverses the database starting at the Root and removes all unreferenced data from persistent storage. -func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { - return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs, safepointF) +func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { + return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF) } func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error { diff --git a/go/store/nbs/archive_chunk_source.go b/go/store/nbs/archive_chunk_source.go index 41221a1aff2..d637cba4093 100644 --- a/go/store/nbs/archive_chunk_source.go +++ b/go/store/nbs/archive_chunk_source.go @@ -151,7 +151,9 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) } func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { - return false, errors.New("Archive chunk source does not support getManyCompressed") + return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) { + found(ctx, ChunkToCompressedChunk(*chk)) + }, stats) } func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error { diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 67b28268ad6..03942c21353 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -30,6 +30,8 @@ import ( var _ chunks.ChunkStore = (*GenerationalNBS)(nil) var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) var _ chunks.TableFileStore = (*GenerationalNBS)(nil) +var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) +var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil) type GenerationalNBS struct { oldGen *NomsBlockStore @@ -492,3 +494,39 @@ func (gcs *GenerationalNBS) Path() (string, bool) { func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { return gcs.newGen.UpdateManifest(ctx, updates) } + +func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool) error { + return gcs.newGen.BeginGC(keeper) +} + +func (gcs *GenerationalNBS) EndGC() { + gcs.newGen.EndGC() +} + +func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { + return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest) +} + +func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { + err := gcs.newGen.IterateAllChunks(ctx, cb) + if err != nil { + return err + } + err = gcs.oldGen.IterateAllChunks(ctx, cb) + if err != nil { + return err + } + return nil +} + +func (gcs *GenerationalNBS) Count() (uint32, error) { + newGenCnt, err := gcs.newGen.Count() + if err != nil { + return 0, err + } + oldGenCnt, err := gcs.oldGen.Count() + if err != nil { + return 0, err + } + return newGenCnt + oldGenCnt, nil +} diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 78b7a75ec78..0294e992021 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -79,7 +79,7 @@ func (nbsMW *NBSMetricWrapper) EndGC() { nbsMW.nbs.EndGC() } -func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error { +func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest) } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index e17f1e847a8..054d0c57542 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1037,6 +1037,34 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha return nbs.hasMany(toHasRecords(hashes)) } +func (nbs *NomsBlockStore) hasManyInSources(srcs []hash.Hash, hashes hash.HashSet) (hash.HashSet, error) { + if hashes.Size() == 0 { + return nil, nil + } + + t1 := time.Now() + defer nbs.stats.HasLatency.SampleTimeSince(t1) + nbs.stats.AddressesPerHas.SampleLen(hashes.Size()) + + nbs.mu.RLock() + defer nbs.mu.RUnlock() + + records := toHasRecords(hashes) + + _, err := nbs.tables.hasManyInSources(srcs, records) + if err != nil { + return nil, err + } + + absent := hash.HashSet{} + for _, r := range records { + if !r.has { + absent.Insert(*r.a) + } + } + return absent, nil +} + func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) { tables, remaining, err := func() (tables chunkReader, remaining bool, err error) { tables = nbs.tables @@ -1569,10 +1597,14 @@ func (nbs *NomsBlockStore) EndGC() { nbs.cond.Broadcast() } -func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error { +func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { + return markAndSweepChunks(ctx, hashes, nbs, nbs, dest) +} + +func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { - return chunks.ErrUnsupportedOperation + return nil, chunks.ErrUnsupportedOperation } precheck := func() error { @@ -1589,36 +1621,59 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan } err := precheck() if err != nil { - return err + return nil, err } - destNBS := nbs + var destNBS *NomsBlockStore if dest != nil { switch typed := dest.(type) { case *NomsBlockStore: destNBS = typed case NBSMetricWrapper: destNBS = typed.nbs + default: + return nil, fmt.Errorf("cannot MarkAndSweep into a non-NomsBlockStore ChunkStore: %w", chunks.ErrUnsupportedOperation) } + } else { + destNBS = nbs } - specs, err := nbs.copyMarkedChunks(ctx, hashes, destNBS) + specs, err := copyMarkedChunks(ctx, hashes, src, destNBS) if err != nil { - return err + return nil, err } if ctx.Err() != nil { - return ctx.Err() + return nil, ctx.Err() } - if destNBS == nbs { - return nbs.swapTables(ctx, specs) - } else { - fileIdToNumChunks := tableSpecsToMap(specs) - return destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks) + return gcFinalizer{ + nbs: destNBS, + specs: specs, + }, nil +} + +type gcFinalizer struct { + nbs *NomsBlockStore + specs []tableSpec +} + +func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc, error) { + fileIdToNumChunks := tableSpecsToMap(gcf.specs) + var addrs []hash.Hash + for _, spec := range gcf.specs { + addrs = append(addrs, spec.name) + } + f := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + return gcf.nbs.hasManyInSources(addrs, hashes) } + return f, gcf.nbs.AddTableFilesToManifest(ctx, fileIdToNumChunks) +} + +func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { + return gcf.nbs.swapTables(ctx, gcf.specs) } -func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, dest *NomsBlockStore) ([]tableSpec, error) { +func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) { tfp, ok := dest.p.(tableFilePersister) if !ok { return nil, fmt.Errorf("NBS does not support copying garbage collection") @@ -1642,7 +1697,7 @@ LOOP: mu := new(sync.Mutex) hashset := hash.NewHashSet(hs...) found := 0 - err := nbs.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) { + err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) { mu.Lock() defer mu.Unlock() if addErr != nil { diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index dbb3ba9eb6b..56416130a29 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -339,7 +339,11 @@ func TestNBSCopyGC(t *testing.T) { wg.Add(1) go func() { require.NoError(t, st.BeginGC(nil)) - msErr = st.MarkAndSweepChunks(ctx, keepChan, nil) + var finalizer chunks.GCFinalizer + finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil) + if msErr == nil { + msErr = finalizer.SwapChunksInStore(ctx) + } st.EndGC() wg.Done() }() diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index cddb4699029..185743199a0 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -115,6 +115,44 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { return f(ts.upstream) } +// Updates the records in |addrs| for whether they exist in this table set, but +// only consults tables whose names appear in |srcs|, ignoring all other tables +// in the table set. Returns |remaining| as true if all addresses were not +// found in the consulted tables, and false otherwise. +// +// Intended to be exactly like |hasMany|, except filtering for the files +// consulted. Only used for part of the GC workflow where we want to have +// access to all chunks in the store but need to check for existing chunk +// presence in only a subset of its files. +func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord) (remaining bool, err error) { + for _, rec := range addrs { + if !rec.has { + remaining = true + break + } + } + if !remaining { + return false, nil + } + for _, srcAddr := range srcs { + src, ok := ts.novel[srcAddr] + if !ok { + src, ok = ts.upstream[srcAddr] + if !ok { + continue + } + } + remaining, err = src.hasMany(addrs) + if err != nil { + return false, err + } + if !remaining { + break + } + } + return remaining, nil +} + func (ts tableSet) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { if err := ctx.Err(); err != nil { return nil, err diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index ba4b493931e..2fba7a75b83 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -36,8 +36,6 @@ import ( "github.com/dolthub/dolt/go/store/util/sizecache" ) -type HashFilterFunc func(context.Context, hash.HashSet) (hash.HashSet, error) - func unfilteredHashFunc(_ context.Context, hs hash.HashSet) (hash.HashSet, error) { return hs, nil } @@ -563,83 +561,143 @@ func makeBatches(hss []hash.HashSet, count int) [][]hash.Hash { return res } +type GCMode int + +const ( + GCModeDefault GCMode = iota + GCModeFull +) + // GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore -func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { +func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { lvs.versOnce.Do(lvs.expectVersion) lvs.transitionToOldGenGC() defer lvs.transitionToNoGC() - if gcs, ok := lvs.cs.(chunks.GenerationalCS); ok { + gcs, gcsOK := lvs.cs.(chunks.GenerationalCS) + collector, collectorOK := lvs.cs.(chunks.ChunkStoreGarbageCollector) + + if gcsOK && collectorOK { oldGen := gcs.OldGen() newGen := gcs.NewGen() - err := newGen.BeginGC(lvs.gcAddChunk) - if err != nil { - return err + var oldGenHasMany chunks.HasManyFunc + switch mode { + case GCModeDefault: + oldGenHasMany = oldGen.HasMany + case GCModeFull: + oldGenHasMany = unfilteredHashFunc + default: + return fmt.Errorf("unsupported GCMode %v", mode) } - root, err := lvs.Root(ctx) - if err != nil { - newGen.EndGC() - return err - } + err := func() error { + err := collector.BeginGC(lvs.gcAddChunk) + if err != nil { + return err + } + defer collector.EndGC() - if root == (hash.Hash{}) { - // empty root - newGen.EndGC() - return nil - } + root, err := lvs.Root(ctx) + if err != nil { + return err + } - oldGenRefs, err = oldGen.HasMany(ctx, oldGenRefs) - if err != nil { - return err - } + if root == (hash.Hash{}) { + // empty root + return nil + } - newGenRefs.Insert(root) + oldGenRefs, err = oldGenHasMany(ctx, oldGenRefs) + if err != nil { + return err + } - err = lvs.gc(ctx, oldGenRefs, oldGen.HasMany, newGen, oldGen, nil, func() hash.HashSet { - n := lvs.transitionToNewGenGC() - newGenRefs.InsertAll(n) - return make(hash.HashSet) - }) - if err != nil { - newGen.EndGC() - return err - } + newGenRefs.Insert(root) + + var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer + oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet { + n := lvs.transitionToNewGenGC() + newGenRefs.InsertAll(n) + return make(hash.HashSet) + }) + if err != nil { + return err + } + + var newFileHasMany chunks.HasManyFunc + newFileHasMany, err = oldGenFinalizer.AddChunksToStore(ctx) + if err != nil { + return err + } + + if mode == GCModeDefault { + oldGenHasMany = oldGen.HasMany + } else { + oldGenHasMany = newFileHasMany + } + + newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC) + if err != nil { + return err + } + + err = newGenFinalizer.SwapChunksInStore(ctx) + if err != nil { + return err + } + + if mode == GCModeFull { + err = oldGenFinalizer.SwapChunksInStore(ctx) + if err != nil { + return err + } + } + + return nil + }() - err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC) - newGen.EndGC() if err != nil { return err } - - } else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok { + } else if collectorOK { extraNewGenRefs := lvs.transitionToNewGenGC() newGenRefs.InsertAll(extraNewGenRefs) newGenRefs.InsertAll(oldGenRefs) - err := collector.BeginGC(lvs.gcAddChunk) - if err != nil { - return err - } + err := func() error { + err := collector.BeginGC(lvs.gcAddChunk) + if err != nil { + return err + } - root, err := lvs.Root(ctx) - if err != nil { - collector.EndGC() - return err - } + root, err := lvs.Root(ctx) + if err != nil { + return err + } - if root == (hash.Hash{}) { - // empty root - collector.EndGC() - return nil - } + if root == (hash.Hash{}) { + // empty root + return nil + } - newGenRefs.Insert(root) + newGenRefs.Insert(root) + + var finalizer chunks.GCFinalizer + finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC) + if err != nil { + return err + } + + err = finalizer.SwapChunksInStore(ctx) + if err != nil { + return err + } + + return nil + }() - err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC) - collector.EndGC() if err != nil { return err } @@ -660,15 +718,19 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS func (lvs *ValueStore) gc(ctx context.Context, toVisit hash.HashSet, - hashFilter HashFilterFunc, + hashFilter chunks.HasManyFunc, src, dest chunks.ChunkStoreGarbageCollector, safepointF func() error, - finalize func() hash.HashSet) error { + finalize func() hash.HashSet) (chunks.GCFinalizer, error) { keepChunks := make(chan []hash.Hash, gcBuffSize) + var gcFinalizer chunks.GCFinalizer + eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return src.MarkAndSweepChunks(ctx, keepChunks, dest) + var err error + gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest) + return err }) keepHashes := func(hs []hash.Hash) error { @@ -706,12 +768,13 @@ func (lvs *ValueStore) gc(ctx context.Context, return nil }) - return eg.Wait() + err := eg.Wait() + return gcFinalizer, err } func (lvs *ValueStore) gcProcessRefs(ctx context.Context, initialToVisit hash.HashSet, keepHashes func(hs []hash.Hash) error, - walker *parallelRefWalker, hashFilter HashFilterFunc, + walker *parallelRefWalker, hashFilter chunks.HasManyFunc, safepointF func() error, finalize func() hash.HashSet) error { visited := make(hash.HashSet) diff --git a/go/store/types/value_store_test.go b/go/store/types/value_store_test.go index 5f15c3f9283..d797671c04c 100644 --- a/go/store/types/value_store_test.go +++ b/go/store/types/value_store_test.go @@ -198,7 +198,7 @@ func TestGC(t *testing.T) { require.NoError(t, err) assert.NotNil(v2) - err = vs.GC(ctx, hash.HashSet{}, hash.HashSet{}, nil) + err = vs.GC(ctx, GCModeDefault, hash.HashSet{}, hash.HashSet{}, nil) require.NoError(t, err) v1, err = vs.ReadValue(ctx, h1) // non-nil diff --git a/integration-tests/bats/garbage_collection.bats b/integration-tests/bats/garbage_collection.bats index a9b9cdd63e0..321ce6ac74c 100644 --- a/integration-tests/bats/garbage_collection.bats +++ b/integration-tests/bats/garbage_collection.bats @@ -404,3 +404,75 @@ SQL echo "$AFTER" [ "$BEFORE" -gt "$AFTER" ] } + +@test "garbage_collection: dolt gc --full" { + # Create a lot of data on a new branch. + dolt checkout -b to_keep + dolt sql -q "CREATE TABLE vals (val LONGTEXT);" + str="hex(random_bytes(1024))" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + + dolt commit -Am 'create some data on a new commit.' + + # Create a lot of data on another new branch. + dolt checkout main + dolt checkout -b to_delete + dolt sql -q "CREATE TABLE vals (val LONGTEXT);" + str="hex(random_bytes(1024))" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + + dolt commit -Am 'create some data on a new commit.' + + # GC it into the old gen. + dolt gc + + # Get repository size. Note, this is in 512 byte blocks. + BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') + + # Delete the branch with all the data. + dolt checkout main + dolt branch -D to_delete + + # Check that a regular GC does not delete this data. + dolt gc + AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') + [ $(($BEFORE - $AFTER)) -lt 16 ] + + # Check that a full GC does delete this data. + # NOTE: We create and drop the tmp table here to get around Dolt's "GC is + # a no-op if there have been no writes since the last GC" check. + dolt sql -q 'create table tmp (id int); drop table tmp;' + dolt gc --full + AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') + [ $(($BEFORE - $AFTER)) -gt 8192 ] # Reclaim at least 4MBs, in 512-byte blocks. + + # Sanity check that the stuff on to_keep is still accessible. + dolt checkout to_keep + dolt sql -q 'select length(val) from vals;' +}