diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index ae65d9293f6..e18b5d12964 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -34,6 +34,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/servercfg" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" @@ -92,6 +93,13 @@ func NewSqlEngine( mrEnv *env.MultiRepoEnv, config *SqlEngineConfig, ) (*SqlEngine, error) { + gcSafepointController := gcctx.NewGCSafepointController() + ctx = gcctx.WithGCSafepointController(ctx, gcSafepointController) + + defer gcctx.SessionEnd(ctx) + gcctx.SessionCommandBegin(ctx) + defer gcctx.SessionCommandEnd(ctx) + dbs, locations, err := CollectDBs(ctx, mrEnv, config.Bulk) if err != nil { return nil, err @@ -137,8 +145,6 @@ func NewSqlEngine( locations = append(locations, nil) } - gcSafepointController := dsess.NewGCSafepointController() - b := env.GetDefaultInitBranch(mrEnv.Config()) pro, err := dsqle.NewDoltDatabaseProviderWithDatabases(b, mrEnv.FileSystem(), all, locations) if err != nil { @@ -452,7 +458,7 @@ func sqlContextFactory(ctx context.Context, session sql.Session) (*sql.Context, } // doltSessionFactory returns a sessionFactory that creates a new DoltSession -func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, gcSafepointController *dsess.GCSafepointController, autocommit bool) sessionFactory { +func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, gcSafepointController *gcctx.GCSafepointController, autocommit bool) sessionFactory { return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) { doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro, writer.NewWriteSession, gcSafepointController) if err != nil { diff --git a/go/libraries/doltcore/doltdb/gcctx/context.go b/go/libraries/doltcore/doltdb/gcctx/context.go new file mode 100644 index 00000000000..120b401698f --- /dev/null +++ b/go/libraries/doltcore/doltdb/gcctx/context.go @@ -0,0 +1,86 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcctx + +import ( + "context" + + "github.com/dolthub/dolt/go/store/hash" +) + +type ctxKey int + +var safepointControllerkey ctxKey + +type ctxState struct { + controller *GCSafepointController +} + +// Creates a |Context| that registers GC safepoint lifecycle events +// with the given GCSafepointController. +// +// The lifecycle events themselves are done through the functions +// |SessionEnd|, |SessionCommandBegin| and |SessionCommandEnd| in this +// package. +// +// Sessions registered with the safepoint controller this way +// currently do not have a way to have their GC roots visited. As a +// consequence, they cannot hold database state in memory outside of +// lifecycle events. This is still useful for accessing doltdb.DoltDB +// data from things like background threads and interactings with the +// GC safepoint mechanism. All uses which occur from within a proper +// SQL context should instead of sql.Session{End,Command{Begin,End}} +// on the *DoltSession. +func WithGCSafepointController(ctx context.Context, controller *GCSafepointController) context.Context { + state := &ctxState{ + controller: controller, + } + ret := context.WithValue(ctx, safepointControllerkey, state) + return ret +} + +func SessionEnd(ctx context.Context) { + state := ctx.Value(safepointControllerkey).(*ctxState) + state.controller.SessionEnd(state) +} + +func SessionCommandBegin(ctx context.Context) { + state := ctx.Value(safepointControllerkey).(*ctxState) + state.controller.SessionCommandBegin(state) +} + +func SessionCommandEnd(ctx context.Context) { + state := ctx.Value(safepointControllerkey).(*ctxState) + state.controller.SessionCommandEnd(state) +} + +func GetGCSafepointController(ctx context.Context) *GCSafepointController { + if v := ctx.Value(safepointControllerkey); v != nil { + return v.(*ctxState).controller + } + return nil +} + +func GetValidate(ctx context.Context) func() { + return ctx.Value(safepointControllerkey).(*ctxState).Validate +} + +func (*ctxState) VisitGCRoots(context.Context, string, func(hash.Hash) bool) error { + return nil +} + +func (s *ctxState) Validate() { + s.controller.Validate(s) +} diff --git a/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go b/go/libraries/doltcore/doltdb/gcctx/gc_safepoint_controller.go similarity index 79% rename from go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go rename to go/libraries/doltcore/doltdb/gcctx/gc_safepoint_controller.go index 5c84ea14a01..7a1340f67b6 100644 --- a/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller.go +++ b/go/libraries/doltcore/doltdb/gcctx/gc_safepoint_controller.go @@ -12,20 +12,46 @@ // See the License for the specific language governing permissions and // limitations under the License. -package dsess +package gcctx import ( "context" "errors" "sync" "sync/atomic" + + "github.com/dolthub/dolt/go/store/hash" ) type GCSafepointController struct { mu sync.Mutex // All known sessions. The first command registers the session // here and SessionEnd causes it to be removed. - sessions map[*DoltSession]*GCSafepointSessionState + sessions map[GCRootsProvider]*GCSafepointSessionState +} + +// A GCRootsProvider is the thing that a GCSafepointController +// tracks. It also calls it a |session|, as it represents a client +// interacting with the database. The GCRootsProvider is registered +// with the Controller through the first SessionCommandBegin() call, +// and it goes through lifecycle callbacks which effect safepoint +// establishment when it gets passed to SessionEnd and +// SessionCommandEnd as well. +// +// A GCRootsProvider implements a single method |VisitGCRoots|, which +// will be called if a GC needs to establish a safepoint while the +// session is alive. The method is responsible for passing every live +// in-memory chunk address related to the given |db| to the provided +// |roots| callback. If it cannot do this for some reason, it should +// return an error, but in that case the GC will fail. A +// |VisitGCRoots| function will never be called after a +// |GCRootsProvider| is given to |gcctx.SessionCommandBegin| until it +// is given to |gcctx.SessionCommandEnd| again. If there is an +// outstanding call to |VisitGCRoots| when |sql.SessionCommandBegin| +// is called with a given roots provider, that call will block until +// the call to |VisitGCRoots| completes. +type GCRootsProvider interface { + VisitGCRoots(ctx context.Context, db string, roots func(hash.Hash) bool) error } type GCSafepointSessionState struct { @@ -97,7 +123,7 @@ type GCSafepointWaiter struct { func NewGCSafepointController() *GCSafepointController { return &GCSafepointController{ - sessions: make(map[*DoltSession]*GCSafepointSessionState), + sessions: make(map[GCRootsProvider]*GCSafepointSessionState), } } @@ -112,7 +138,7 @@ func NewGCSafepointController() *GCSafepointController { // // After creating a Waiter, it is an error to create a new Waiter before the |Wait| method of the // original watier has returned. This error is not guaranteed to always be detected. -func (c *GCSafepointController) Waiter(ctx context.Context, thisSession *DoltSession, visitQuiescedSession func(context.Context, *DoltSession) error) *GCSafepointWaiter { +func (c *GCSafepointController) Waiter(ctx context.Context, thisSession GCRootsProvider, visitQuiescedSession func(context.Context, GCRootsProvider) error) *GCSafepointWaiter { c.mu.Lock() defer c.mu.Unlock() ret := &GCSafepointWaiter{controller: c} @@ -227,7 +253,7 @@ func (w *GCSafepointWaiter) Wait(ctx context.Context) error { // one command can be outstanding at a time, and whether a command // is outstanding controls how |Waiter| treats the Session when it // is setting up all Sessions to visit their GC roots. -func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error { +func (c *GCSafepointController) SessionCommandBegin(s GCRootsProvider) error { c.mu.Lock() defer c.mu.Unlock() var state *GCSafepointSessionState @@ -237,7 +263,7 @@ func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error { c.sessions[s] = state } if state.OutstandingCommand { - panic("SessionBeginCommand called on a session that already had an outstanding command.") + panic("SessionCommandBegin called on a session that already had an outstanding command.") } // Step #2: Receiving from QuiesceCallbackDone blocks, then // the callback for this Session is still outstanding. We @@ -249,7 +275,7 @@ func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error { c.mu.Lock() if state.OutstandingCommand { // Concurrent calls to SessionCommandBegin. Bad times... - panic("SessionBeginCommand called on a session that already had an outstanding command.") + panic("SessionCommandBegin called on a session that already had an outstanding command.") } } // Step #3. Record that a command is running so that Waiter @@ -259,11 +285,23 @@ func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error { return nil } +// Called as part of valctx context validation, this asserts that the +// session is registered with an open command. +func (c *GCSafepointController) Validate(s GCRootsProvider) { + c.mu.Lock() + defer c.mu.Unlock() + if state := c.sessions[s]; state == nil { + panic("GCSafepointController.Validate; expected session with an open command, but no session registered with controller.") + } else if !state.OutstandingCommand { + panic("GCSafepointController.Validate; expected session with an open command, but the registered session has OutstandingCommand == false.") + } +} + // SessionCommandEnd marks the end of a session command. It has for // effects that the session no longer has an OutstandingCommand and, // if CommandEndCallback was non-nil, the callback itself has been // called and the CommandEndCallback field has been reset to |nil|. -func (c *GCSafepointController) SessionCommandEnd(s *DoltSession) { +func (c *GCSafepointController) SessionCommandEnd(s GCRootsProvider) { c.mu.Lock() defer c.mu.Unlock() state := c.sessions[s] @@ -284,16 +322,16 @@ func (c *GCSafepointController) SessionCommandEnd(s *DoltSession) { // if we already knew about it. It is an error to call this on a // session which currently has an outstanding command. // -// Because we only register sessions when the BeginCommand, it is +// Because we only register sessions when the CommandBegin, it is // possible to get a SessionEnd callback for a session that was // never registered. // // This callback does not block for any outstanding |visitQuiescedSession| // callback to be completed before allowing the session to unregister -// itself. It is an error for the application to call |SessionBeginCommand| +// itself. It is an error for the application to call |SessionCommandBegin| // on a session after it is has called |SessionEnd| on it, but that error // is not necessarily detected. -func (c *GCSafepointController) SessionEnd(s *DoltSession) { +func (c *GCSafepointController) SessionEnd(s GCRootsProvider) { c.mu.Lock() defer c.mu.Unlock() state := c.sessions[s] diff --git a/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller_test.go b/go/libraries/doltcore/doltdb/gcctx/gc_safepoint_controller_test.go similarity index 88% rename from go/libraries/doltcore/sqle/dsess/gc_safepoint_controller_test.go rename to go/libraries/doltcore/doltdb/gcctx/gc_safepoint_controller_test.go index 4d74b6bc109..21cb95410cf 100644 --- a/go/libraries/doltcore/sqle/dsess/gc_safepoint_controller_test.go +++ b/go/libraries/doltcore/doltdb/gcctx/gc_safepoint_controller_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package dsess +package gcctx import ( "context" @@ -20,6 +20,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/hash" ) func TestGCSafepointController(t *testing.T) { @@ -29,12 +31,12 @@ func TestGCSafepointController(t *testing.T) { t.Run("UnknownSession", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - controller.SessionEnd(&DoltSession{}) + controller.SessionEnd(&visitable{}) }) t.Run("KnownSession", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - sess := &DoltSession{} + sess := &visitable{} controller.SessionCommandBegin(sess) controller.SessionCommandEnd(sess) controller.SessionEnd(sess) @@ -42,7 +44,7 @@ func TestGCSafepointController(t *testing.T) { t.Run("RunningSession", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - sess := &DoltSession{} + sess := &visitable{} controller.SessionCommandBegin(sess) require.Panics(t, func() { controller.SessionEnd(sess) @@ -54,7 +56,7 @@ func TestGCSafepointController(t *testing.T) { t.Run("RunningSession", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - sess := &DoltSession{} + sess := &visitable{} controller.SessionCommandBegin(sess) require.Panics(t, func() { controller.SessionCommandBegin(sess) @@ -63,7 +65,7 @@ func TestGCSafepointController(t *testing.T) { t.Run("AfterCommandEnd", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - sess := &DoltSession{} + sess := &visitable{} controller.SessionCommandBegin(sess) controller.SessionCommandEnd(sess) controller.SessionCommandBegin(sess) @@ -74,7 +76,7 @@ func TestGCSafepointController(t *testing.T) { t.Run("NotKnown", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - sess := &DoltSession{} + sess := &visitable{} require.Panics(t, func() { controller.SessionCommandEnd(sess) }) @@ -82,7 +84,7 @@ func TestGCSafepointController(t *testing.T) { t.Run("NotRunning", func(t *testing.T) { t.Parallel() controller := NewGCSafepointController() - sess := &DoltSession{} + sess := &visitable{} controller.SessionCommandBegin(sess) controller.SessionCommandEnd(sess) require.Panics(t, func() { @@ -95,7 +97,7 @@ func TestGCSafepointController(t *testing.T) { t.Run("Empty", func(t *testing.T) { t.Parallel() var nilCh chan struct{} - block := func(context.Context, *DoltSession) error { + block := func(context.Context, GCRootsProvider) error { <-nilCh return nil } @@ -106,11 +108,11 @@ func TestGCSafepointController(t *testing.T) { t.Run("OnlyThisSession", func(t *testing.T) { t.Parallel() var nilCh chan struct{} - block := func(context.Context, *DoltSession) error { + block := func(context.Context, GCRootsProvider) error { <-nilCh return nil } - sess := &DoltSession{} + sess := &visitable{} controller := NewGCSafepointController() controller.SessionCommandBegin(sess) waiter := controller.Waiter(context.Background(), sess, block) @@ -124,14 +126,14 @@ func TestGCSafepointController(t *testing.T) { // but not within a command and another one // is within a command at the time the // waiter is created. - quiesced := &DoltSession{} - running := &DoltSession{} + quiesced := &visitable{} + running := &visitable{} controller := NewGCSafepointController() controller.SessionCommandBegin(quiesced) controller.SessionCommandBegin(running) controller.SessionCommandEnd(quiesced) sawQuiesced, sawRunning, waitDone := make(chan struct{}), make(chan struct{}), make(chan struct{}) - wait := func(_ context.Context, s *DoltSession) error { + wait := func(_ context.Context, s GCRootsProvider) error { if s == quiesced { close(sawQuiesced) } else if s == running { @@ -165,14 +167,14 @@ func TestGCSafepointController(t *testing.T) { t.Parallel() // When the Wait context is canceled, we do not block on // the running sessions and they never get visited. - quiesced := &DoltSession{} - running := &DoltSession{} + quiesced := &visitable{} + running := &visitable{} controller := NewGCSafepointController() controller.SessionCommandBegin(quiesced) controller.SessionCommandBegin(running) controller.SessionCommandEnd(quiesced) sawQuiesced, sawRunning, waitDone := make(chan struct{}), make(chan struct{}), make(chan struct{}) - wait := func(_ context.Context, s *DoltSession) error { + wait := func(_ context.Context, s GCRootsProvider) error { if s == quiesced { close(sawQuiesced) } else if s == running { @@ -212,15 +214,15 @@ func TestGCSafepointController(t *testing.T) { }) t.Run("BeginBlocksUntilVisitFinished", func(t *testing.T) { t.Parallel() - quiesced := &DoltSession{} - running := &DoltSession{} + quiesced := &visitable{} + running := &visitable{} controller := NewGCSafepointController() controller.SessionCommandBegin(quiesced) controller.SessionCommandEnd(quiesced) controller.SessionCommandBegin(running) finishQuiesced, finishRunning := make(chan struct{}), make(chan struct{}) sawQuiesced, sawRunning := make(chan struct{}), make(chan struct{}) - wait := func(_ context.Context, s *DoltSession) error { + wait := func(_ context.Context, s GCRootsProvider) error { if s == quiesced { close(sawQuiesced) <-finishQuiesced @@ -250,7 +252,7 @@ func TestGCSafepointController(t *testing.T) { case <-time.After(50 * time.Millisecond): } - newSession := &DoltSession{} + newSession := &visitable{} controller.SessionCommandBegin(newSession) controller.SessionCommandEnd(newSession) controller.SessionEnd(newSession) @@ -283,10 +285,19 @@ func TestGCSafepointController(t *testing.T) { controller.SessionEnd(quiesced) controller.SessionEnd(running) - err := controller.Waiter(context.Background(), nil, func(context.Context, *DoltSession) error { + err := controller.Waiter(context.Background(), nil, func(context.Context, GCRootsProvider) error { panic("unexpected registered session") }).Wait(context.Background()) require.NoError(t, err) }) }) } + +type visitable struct { + // Give it an unused memory so it gets a unique address. + state bool +} + +func (*visitable) VisitGCRoots(ctx context.Context, db string, keep func(hash.Hash) bool) error { + return nil +} diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index e41e2d28bb6..b111c03e138 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -28,6 +28,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" @@ -157,17 +158,17 @@ func (sc killConnectionsSafepointController) CancelSafepoint() { } type sessionAwareSafepointController struct { - controller *dsess.GCSafepointController + controller *gcctx.GCSafepointController dbname string callSession *dsess.DoltSession origEpoch int doltDB *doltdb.DoltDB - waiter *dsess.GCSafepointWaiter + waiter *gcctx.GCSafepointWaiter keeper func(hash.Hash) bool } -func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error { +func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess gcctx.GCRootsProvider) error { return sess.VisitGCRoots(ctx, sc.dbname, sc.keeper) } diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index 0fc5afc1640..106a33de725 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -29,6 +29,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess/mutexmap" @@ -67,10 +68,29 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb mm: mutexmap.NewMutexMap(), init: make(chan struct{}), } - ait.runInitWithRootsAsync(ctx, roots...) + ctx = context.Background() + gcSafepointController := getGCSafepointController(ctx) + if gcSafepointController != nil { + ctx = gcctx.WithGCSafepointController(ctx, gcSafepointController) + } + go func() { + if gcSafepointController != nil { + defer gcctx.SessionEnd(ctx) + gcctx.SessionCommandBegin(ctx) + defer gcctx.SessionCommandEnd(ctx) + } + ait.initWithRoots(ctx, roots...) + }() return &ait, nil } +func getGCSafepointController(ctx context.Context) *gcctx.GCSafepointController { + if sqlCtx, ok := ctx.(*sql.Context); ok { + return DSessFromSess(sqlCtx.Session).GCSafepointController() + } + return gcctx.GetGCSafepointController(ctx) +} + func loadAutoIncValue(sequences *sync.Map, tableName string) uint64 { tableName = strings.ToLower(tableName) current, hasCurrent := sequences.Load(tableName) @@ -449,14 +469,18 @@ func (a *AutoIncrementTracker) waitForInit() error { } } -func (a *AutoIncrementTracker) runInitWithRootsAsync(ctx context.Context, roots ...doltdb.Rootish) { - go func() { - defer close(a.init) - a.initErr = a.initWithRoots(ctx, roots...) - }() -} - +// This method will initialize the AutoIncrementTracker state with all +// data from the tables found in |roots|. This method closes the +// |a.init| channel when it completes. It is meant to be run in a +// goroutine, as in `go a.initWithRoots(...)`. When running this method, +// a newly allocated |a.init| channel should exist. +// +// It is the caller's responsibility to ensure that whatever |ctx| +// |initWithRoots| is called with appropriately outlives the end of +// the method and that it participates in GC lifecycle callbacks +// appropriately, if that is necessary. func (a *AutoIncrementTracker) initWithRoots(ctx context.Context, roots ...doltdb.Rootish) error { + defer close(a.init) eg, egCtx := errgroup.WithContext(ctx) eg.SetLimit(128) @@ -500,6 +524,6 @@ func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltd return err } a.init = make(chan struct{}) - a.runInitWithRootsAsync(ctx, roots...) + go a.initWithRoots(ctx, roots...) return a.waitForInit() } diff --git a/go/libraries/doltcore/sqle/dsess/session.go b/go/libraries/doltcore/sqle/dsess/session.go index 036dccc309f..7816ee2c1c8 100644 --- a/go/libraries/doltcore/sqle/dsess/session.go +++ b/go/libraries/doltcore/sqle/dsess/session.go @@ -30,6 +30,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/env/actions" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -64,7 +65,7 @@ type DoltSession struct { mu *sync.Mutex fs filesys.Filesys writeSessProv WriteSessFunc - gcSafepointController *GCSafepointController + gcSafepointController *gcctx.GCSafepointController // If non-nil, this will be returned from ValidateSession. // Used by sqle/cluster to put a session into a terminal err state. @@ -102,7 +103,7 @@ func NewDoltSession( branchController *branch_control.Controller, statsProvider sql.StatsProvider, writeSessProv WriteSessFunc, - gcSafepointController *GCSafepointController, + gcSafepointController *gcctx.GCSafepointController, ) (*DoltSession, error) { username := conf.GetStringOrDefault(config.UserNameKey, "") email := conf.GetStringOrDefault(config.UserEmailKey, "") @@ -1767,7 +1768,7 @@ func (d *DoltSession) SessionEnd() { // dolt_gc accesses the safepoint controller for the current // sql engine through here. -func (d *DoltSession) GCSafepointController() *GCSafepointController { +func (d *DoltSession) GCSafepointController() *gcctx.GCSafepointController { return d.gcSafepointController } diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_harness.go b/go/libraries/doltcore/sqle/enginetest/dolt_harness.go index d9b42b7643d..0923306afe5 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_harness.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_harness.go @@ -33,6 +33,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" @@ -51,7 +52,7 @@ type DoltHarness struct { multiRepoEnv *env.MultiRepoEnv session *dsess.DoltSession branchControl *branch_control.Controller - gcSafepointController *dsess.GCSafepointController + gcSafepointController *gcctx.GCSafepointController parallelism int skippedQueries []string setupData []setup.SetupScript @@ -247,7 +248,7 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) { d.provider = doltProvider - d.gcSafepointController = dsess.NewGCSafepointController() + d.gcSafepointController = gcctx.NewGCSafepointController() bThreads := sql.NewBackgroundThreads() diff --git a/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go b/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go index 1f8c3f95ef9..4dc7221a19c 100644 --- a/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go +++ b/go/libraries/doltcore/sqle/logictest/dolt/doltharness.go @@ -30,6 +30,7 @@ import ( "github.com/shopspring/decimal" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/env" dsql "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" @@ -143,7 +144,7 @@ func innerInit(h *DoltHarness, dEnv *env.DoltEnv) error { return err } - gcSafepointController := dsess.NewGCSafepointController() + gcSafepointController := gcctx.NewGCSafepointController() config, _ := dEnv.Config.GetConfig(env.GlobalConfig) sqlCtx := dsql.NewTestSQLCtxWithProvider(ctx, pro, config, statspro.StatsNoop{}, gcSafepointController) diff --git a/go/libraries/doltcore/sqle/sqlddl_test.go b/go/libraries/doltcore/sqle/sqlddl_test.go index 329fd5f298f..74f166340d0 100644 --- a/go/libraries/doltcore/sqle/sqlddl_test.go +++ b/go/libraries/doltcore/sqle/sqlddl_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/row" @@ -1113,7 +1114,7 @@ func newTestEngine(ctx context.Context, dEnv *env.DoltEnv) (*gms.Engine, *sql.Co panic(err) } - gcSafepointController := dsess.NewGCSafepointController() + gcSafepointController := gcctx.NewGCSafepointController() doltSession, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, dEnv.Config.WriteableConfig(), nil, nil, writer.NewWriteSession, gcSafepointController) if err != nil { diff --git a/go/libraries/doltcore/sqle/statspro/worker_test.go b/go/libraries/doltcore/sqle/statspro/worker_test.go index 42c27031edf..2e32bdf2229 100644 --- a/go/libraries/doltcore/sqle/statspro/worker_test.go +++ b/go/libraries/doltcore/sqle/statspro/worker_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/ref" @@ -808,7 +809,7 @@ func newTestEngine(ctx context.Context, dEnv *env.DoltEnv, threads *sql.Backgrou sc := NewStatsController(logrus.StandardLogger(), dEnv) - gcSafepointController := dsess.NewGCSafepointController() + gcSafepointController := gcctx.NewGCSafepointController() doltSession, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, dEnv.Config.WriteableConfig(), branch_control.CreateDefaultController(ctx), sc, writer.NewWriteSession, gcSafepointController) if err != nil { diff --git a/go/libraries/doltcore/sqle/testutil.go b/go/libraries/doltcore/sqle/testutil.go index 11d35169906..6d1b5d3840c 100644 --- a/go/libraries/doltcore/sqle/testutil.go +++ b/go/libraries/doltcore/sqle/testutil.go @@ -28,6 +28,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/schema" @@ -123,7 +124,7 @@ func ExecuteSqlOnEngine(ctx *sql.Context, engine *sqle.Engine, statements string return nil } -func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider, config config.ReadWriteConfig, statsPro sql.StatsProvider, gcSafepointController *dsess.GCSafepointController) *sql.Context { +func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider, config config.ReadWriteConfig, statsPro sql.StatsProvider, gcSafepointController *gcctx.GCSafepointController) *sql.Context { s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config, branch_control.CreateDefaultController(ctx), statsPro, writer.NewWriteSession, gcSafepointController) if err != nil { panic(err) @@ -143,7 +144,7 @@ func NewTestEngine(dEnv *env.DoltEnv, ctx context.Context, db dsess.SqlDatabase) if err != nil { return nil, nil, err } - gcSafepointController := dsess.NewGCSafepointController() + gcSafepointController := gcctx.NewGCSafepointController() engine := sqle.NewDefault(pro)