Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dolt gc --full: Implement a flag for GC which will collect everything, including the old gen. #8462

Merged
merged 12 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/dolt/cli/arg_parser_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func CreateLogArgParser(isTableFunction bool) *argparser.ArgParser {
func CreateGCArgParser() *argparser.ArgParser {
ap := argparser.NewArgParserWithMaxArgs("gc", 0)
ap.SupportsFlag(ShallowFlag, "s", "perform a fast, but incomplete garbage collection pass")
ap.SupportsFlag(FullFlag, "f", "perform a full garbage collection, including the old generation")
return ap
}

Expand Down
1 change: 1 addition & 0 deletions go/cmd/dolt/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
DryRunFlag = "dry-run"
EmptyParam = "empty"
ForceFlag = "force"
FullFlag = "full"
GraphFlag = "graph"
HardResetParam = "hard"
HostFlag = "host"
Expand Down
19 changes: 17 additions & 2 deletions go/cmd/dolt/commands/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,17 @@ var gcDocs = cli.CommandDocumentationContent{
ShortDesc: "Cleans up unreferenced data from the repository.",
LongDesc: `Searches the repository for data that is no longer referenced and no longer needed.

If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed.`,
Dolt GC is generational. When a GC is run, everything reachable from any commit on any branch
is put into the old generation. Data which is only reachable from uncommited branch HEADs is kept in
the new generation. By default, Dolt GC will only visit data in the new generation, and so will never
collect data from deleted branches which has previously made its way to the old generation from being
copied during a prior garbage collection.

If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed.

If the {{.EmphasisLeft}}--full{{.EmphasisRight}} flag is supplied, a more thorough garbage collection, fully collecting the old gen and new gen, will be performed.`,
Synopsis: []string{
"[--shallow]",
"[--shallow|--full]",
},
}

Expand Down Expand Up @@ -83,6 +91,10 @@ func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, arg
help, usage := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, gcDocs, ap))
apr := cli.ParseArgsOrDie(ap, args, help)

if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) {
return HandleVErrAndExitCode(errhand.BuildDError("Invalid Argument: --shallow is not compatible with --full").SetPrintUsage().Build(), usage)
}

queryist, sqlCtx, closeFunc, err := cliCtx.QueryEngine(ctx)
if err != nil {
return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage)
Expand Down Expand Up @@ -110,6 +122,9 @@ func constructDoltGCQuery(apr *argparser.ArgParseResults) (string, error) {
if apr.Contains(cli.ShallowFlag) {
query += "'--shallow'"
}
if apr.Contains(cli.FullFlag) {
query += "'--full'"
}
query += ")"
return query, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
// until no possibly-stale ChunkStore state is retained in memory, or failing
// certain in-progress operations which cannot be finalized in a timely manner,
// etc.
func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error {
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
Expand Down Expand Up @@ -1677,7 +1677,7 @@ func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error {
return err
}

return collector.GC(ctx, oldGen, newGen, safepointF)
return collector.GC(ctx, mode, oldGen, newGen, safepointF)
}

func (ddb *DoltDB) ShallowGC(ctx context.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/doltdb/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func testGarbageCollection(t *testing.T, test gcTest) {
}
}

err := dEnv.DoltDB.GC(ctx, nil)
err := dEnv.DoltDB.GC(ctx, types.GCModeDefault, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are weird. Just an observation. I was thinking that I was going to ask for a test for the Full flag here, but it looks like these tests are basically just running Exec. Maybe bats is better than this?

sqlengine tests for this instead? Just sayin. These tests are weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think these tests are worth much compared to a bats test which does the same. Obviously they're supposed to target the DoltDB.GC invocation itself, and they're much easy to attach a debugger to or whatever, but AFAICT they're not providing much in the way of protection compared to existing layers of cheese.

require.NoError(t, err)
test.postGCFunc(ctx, t, dEnv.DoltDB, res)

Expand Down Expand Up @@ -208,7 +208,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) {
_, err = ns.Write(ctx, c1.Node())
require.NoError(t, err)

err = ddb.GC(ctx, nil)
err = ddb.GC(ctx, types.GCModeDefault, nil)
require.NoError(t, err)

c2 := newIntMap(t, ctx, ns, 2, 2)
Expand Down
12 changes: 11 additions & 1 deletion go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/types"
)

const (
Expand Down Expand Up @@ -81,6 +82,10 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
return cmdFailure, fmt.Errorf("Could not load database %s", dbName)
}

if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) {
return cmdFailure, fmt.Errorf("cannot supply both --shallow and --full to dolt_gc: %w", InvalidArgErr)
}

if apr.Contains(cli.ShallowFlag) {
err = ddb.ShallowGC(ctx)
if err != nil {
Expand All @@ -106,10 +111,15 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
origepoch = epoch.(int)
}

var mode types.GCMode = types.GCModeDefault
if apr.Contains(cli.FullFlag) {
mode = types.GCModeFull
}

// TODO: If we got a callback at the beginning and an
// (allowed-to-block) callback at the end, we could more
// gracefully tear things down.
err = ddb.GC(ctx, func() error {
err = ddb.GC(ctx, mode, func() error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
Expand Down
30 changes: 29 additions & 1 deletion go/store/chunks/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,34 @@ type LoggingChunkStore interface {

var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block")

// The function type for ChunkStore.HasMany. Used as a return value in the
// GCFinalizer interface.
type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error)

// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed.
//
// A GCFinalizer is a handle to one or more table files which has been
// constructed as part of the GC process. It can be used to add the table files
// to the existing store, as we do in the case of a default-mode collection
// into the old gen, and it can be used to replace all existing table files in
// the store with the new table files, as we do in the collection into the new
// gen.
//
// In addition, adding the table files to an existing store exposes a HasMany
// implementation which inspects only the table files that were added, not all
// the table files in the resulting store. This is an important part of the
// full gc protocol, which works as follows:
//
// * Collect everything reachable from old gen refs into a new table file in the old gen.
// * Add the new table file to the old gen.
// * Collect everything reachable from new gen refs into the new gen, skipping stuff that is in the new old gen table file.
// * Swap to the new gen table file.
// * Swap to the old gen table file.
type GCFinalizer interface {
AddChunksToStore(ctx context.Context) (HasManyFunc, error)
SwapChunksInStore(ctx context.Context) error
}

// ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection.
type ChunkStoreGarbageCollector interface {
ChunkStore
Expand Down Expand Up @@ -185,7 +213,7 @@ type ChunkStoreGarbageCollector interface {
// This behavior is a little different for ValueStore.GC()'s
// interactions with generational stores. See ValueStore and
// NomsBlockStore/GenerationalNBS for details.
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error
MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error)

// Count returns the number of chunks in the store.
Count() (uint32, error)
Expand Down
29 changes: 21 additions & 8 deletions go/store/chunks/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,24 @@ func (ms *MemoryStoreView) EndGC() {
ms.transitionToNoGC()
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error {
type msvGcFinalizer struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After GC, what does the memory store contain? This refactor looks like it preserves previous behavior, but I'm not sure why the previous behavior is the way it is. Educate me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, memory store is an in-memory chunk store implementation. After GC, memory store just contains all the visited chunks – that is, all the chunks whose address came through on the keepers channel.

ms *MemoryStoreView
keepers map[hash.Hash]Chunk
}

func (mgcf msvGcFinalizer) AddChunksToStore(ctx context.Context) (HasManyFunc, error) {
panic("unsupported")
}

func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error {
mgcf.ms.mu.Lock()
defer mgcf.ms.mu.Unlock()
mgcf.ms.storage = &MemoryStorage{rootHash: mgcf.ms.rootHash, data: mgcf.keepers}
mgcf.ms.pending = map[hash.Hash]Chunk{}
return nil
}

func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
if dest != ms {
panic("unsupported")
}
Expand All @@ -366,20 +383,16 @@ LOOP:
for _, h := range hs {
c, err := ms.Get(ctx, h)
if err != nil {
return err
return nil, err
}
keepers[h] = c
}
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
}
}

ms.mu.Lock()
defer ms.mu.Unlock()
ms.storage = &MemoryStorage{rootHash: ms.rootHash, data: keepers}
ms.pending = map[hash.Hash]Chunk{}
return nil
return msvGcFinalizer{ms, keepers}, nil
}

func (ms *MemoryStoreView) Count() (uint32, error) {
Expand Down
4 changes: 2 additions & 2 deletions go/store/chunks/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ func (s *TestStoreView) EndGC() {
collector.EndGC()
}

func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error {
func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) {
collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector)
if !ok || dest != s {
return ErrUnsupportedOperation
return nil, ErrUnsupportedOperation
}
return collector.MarkAndSweepChunks(ctx, hashes, collector)
}
Expand Down
2 changes: 1 addition & 1 deletion go/store/datas/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type GarbageCollector interface {

// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
}

// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
Expand Down
4 changes: 2 additions & 2 deletions go/store/datas/database_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,8 +1148,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
}

// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs, safepointF)
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF)
}

func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {
Expand Down
4 changes: 3 additions & 1 deletion go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord)
}

func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
return false, errors.New("Archive chunk source does not support getManyCompressed")
return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) {
found(ctx, ChunkToCompressedChunk(*chk))
}, stats)
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
Expand Down
38 changes: 38 additions & 0 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
var _ chunks.ChunkStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.TableFileStore = (*GenerationalNBS)(nil)
var _ chunks.GenerationalCS = (*GenerationalNBS)(nil)
var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil)

type GenerationalNBS struct {
oldGen *NomsBlockStore
Expand Down Expand Up @@ -492,3 +494,39 @@ func (gcs *GenerationalNBS) Path() (string, bool) {
func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) {
return gcs.newGen.UpdateManifest(ctx, updates)
}

func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool) error {
return gcs.newGen.BeginGC(keeper)
}

func (gcs *GenerationalNBS) EndGC() {
gcs.newGen.EndGC()
}

func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest)
}

func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
err := gcs.newGen.IterateAllChunks(ctx, cb)
if err != nil {
return err
}
err = gcs.oldGen.IterateAllChunks(ctx, cb)
if err != nil {
return err
}
return nil
}

func (gcs *GenerationalNBS) Count() (uint32, error) {
newGenCnt, err := gcs.newGen.Count()
if err != nil {
return 0, err
}
oldGenCnt, err := gcs.oldGen.Count()
if err != nil {
return 0, err
}
return newGenCnt + oldGenCnt, nil
}
2 changes: 1 addition & 1 deletion go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (nbsMW *NBSMetricWrapper) EndGC() {
nbsMW.nbs.EndGC()
}

func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error {
func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) {
return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest)
}

Expand Down
Loading
Loading