Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/gcctx"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/servercfg"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
Expand Down Expand Up @@ -92,6 +93,13 @@ func NewSqlEngine(
mrEnv *env.MultiRepoEnv,
config *SqlEngineConfig,
) (*SqlEngine, error) {
gcSafepointController := gcctx.NewGCSafepointController()
ctx = gcctx.WithGCSafepointController(ctx, gcSafepointController)

defer gcctx.SessionEnd(ctx)
gcctx.SessionCommandBegin(ctx)
defer gcctx.SessionCommandEnd(ctx)

dbs, locations, err := CollectDBs(ctx, mrEnv, config.Bulk)
if err != nil {
return nil, err
Expand Down Expand Up @@ -137,8 +145,6 @@ func NewSqlEngine(
locations = append(locations, nil)
}

gcSafepointController := dsess.NewGCSafepointController()

b := env.GetDefaultInitBranch(mrEnv.Config())
pro, err := dsqle.NewDoltDatabaseProviderWithDatabases(b, mrEnv.FileSystem(), all, locations)
if err != nil {
Expand Down Expand Up @@ -452,7 +458,7 @@ func sqlContextFactory(ctx context.Context, session sql.Session) (*sql.Context,
}

// doltSessionFactory returns a sessionFactory that creates a new DoltSession
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, gcSafepointController *dsess.GCSafepointController, autocommit bool) sessionFactory {
func doltSessionFactory(pro *dsqle.DoltDatabaseProvider, statsPro sql.StatsProvider, config config.ReadWriteConfig, bc *branch_control.Controller, gcSafepointController *gcctx.GCSafepointController, autocommit bool) sessionFactory {
return func(mysqlSess *sql.BaseSession, provider sql.DatabaseProvider) (*dsess.DoltSession, error) {
doltSession, err := dsess.NewDoltSession(mysqlSess, pro, config, bc, statsPro, writer.NewWriteSession, gcSafepointController)
if err != nil {
Expand Down
86 changes: 86 additions & 0 deletions go/libraries/doltcore/doltdb/gcctx/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gcctx

import (
"context"

"github.com/dolthub/dolt/go/store/hash"
)

type ctxKey int

var safepointControllerkey ctxKey

type ctxState struct {
controller *GCSafepointController
}

// Creates a |Context| that registers GC safepoint lifecycle events
// with the given GCSafepointController.
//
// The lifecycle events themselves are done through the functions
// |SessionEnd|, |SessionCommandBegin| and |SessionCommandEnd| in this
// package.
//
// Sessions registered with the safepoint controller this way
// currently do not have a way to have their GC roots visited. As a
// consequence, they cannot hold database state in memory outside of
// lifecycle events. This is still useful for accessing doltdb.DoltDB
// data from things like background threads and interactings with the
// GC safepoint mechanism. All uses which occur from within a proper
// SQL context should instead of sql.Session{End,Command{Begin,End}}
// on the *DoltSession.
func WithGCSafepointController(ctx context.Context, controller *GCSafepointController) context.Context {
state := &ctxState{
controller: controller,
}
ret := context.WithValue(ctx, safepointControllerkey, state)
return ret
}

func SessionEnd(ctx context.Context) {
state := ctx.Value(safepointControllerkey).(*ctxState)
state.controller.SessionEnd(state)
}

func SessionCommandBegin(ctx context.Context) {
state := ctx.Value(safepointControllerkey).(*ctxState)
state.controller.SessionCommandBegin(state)
}

func SessionCommandEnd(ctx context.Context) {
state := ctx.Value(safepointControllerkey).(*ctxState)
state.controller.SessionCommandEnd(state)
}

func GetGCSafepointController(ctx context.Context) *GCSafepointController {
if v := ctx.Value(safepointControllerkey); v != nil {
return v.(*ctxState).controller
}
return nil
}

func GetValidate(ctx context.Context) func() {
return ctx.Value(safepointControllerkey).(*ctxState).Validate
}

func (*ctxState) VisitGCRoots(context.Context, string, func(hash.Hash) bool) error {
return nil
}

func (s *ctxState) Validate() {
s.controller.Validate(s)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package dsess
package gcctx

import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/dolthub/dolt/go/store/hash"
)

type GCSafepointController struct {
mu sync.Mutex
// All known sessions. The first command registers the session
// here and SessionEnd causes it to be removed.
sessions map[*DoltSession]*GCSafepointSessionState
sessions map[GCRootsProvider]*GCSafepointSessionState
}

// A GCRootsProvider is the thing that a GCSafepointController
// tracks. It also calls it a |session|, as it represents a client
// interacting with the database. The GCRootsProvider is registered
// with the Controller through the first SessionCommandBegin() call,
// and it goes through lifecycle callbacks which effect safepoint
// establishment when it gets passed to SessionEnd and
// SessionCommandEnd as well.
//
// A GCRootsProvider implements a single method |VisitGCRoots|, which
// will be called if a GC needs to establish a safepoint while the
// session is alive. The method is responsible for passing every live
// in-memory chunk address related to the given |db| to the provided
// |roots| callback. If it cannot do this for some reason, it should
// return an error, but in that case the GC will fail. A
// |VisitGCRoots| function will never be called after a
// |GCRootsProvider| is given to |gcctx.SessionCommandBegin| until it
// is given to |gcctx.SessionCommandEnd| again. If there is an
// outstanding call to |VisitGCRoots| when |sql.SessionCommandBegin|
// is called with a given roots provider, that call will block until
// the call to |VisitGCRoots| completes.
type GCRootsProvider interface {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably needs a doc for the type and its method

VisitGCRoots(ctx context.Context, db string, roots func(hash.Hash) bool) error
}

type GCSafepointSessionState struct {
Expand Down Expand Up @@ -97,7 +123,7 @@ type GCSafepointWaiter struct {

func NewGCSafepointController() *GCSafepointController {
return &GCSafepointController{
sessions: make(map[*DoltSession]*GCSafepointSessionState),
sessions: make(map[GCRootsProvider]*GCSafepointSessionState),
}
}

Expand All @@ -112,7 +138,7 @@ func NewGCSafepointController() *GCSafepointController {
//
// After creating a Waiter, it is an error to create a new Waiter before the |Wait| method of the
// original watier has returned. This error is not guaranteed to always be detected.
func (c *GCSafepointController) Waiter(ctx context.Context, thisSession *DoltSession, visitQuiescedSession func(context.Context, *DoltSession) error) *GCSafepointWaiter {
func (c *GCSafepointController) Waiter(ctx context.Context, thisSession GCRootsProvider, visitQuiescedSession func(context.Context, GCRootsProvider) error) *GCSafepointWaiter {
c.mu.Lock()
defer c.mu.Unlock()
ret := &GCSafepointWaiter{controller: c}
Expand Down Expand Up @@ -227,7 +253,7 @@ func (w *GCSafepointWaiter) Wait(ctx context.Context) error {
// one command can be outstanding at a time, and whether a command
// is outstanding controls how |Waiter| treats the Session when it
// is setting up all Sessions to visit their GC roots.
func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error {
func (c *GCSafepointController) SessionCommandBegin(s GCRootsProvider) error {
c.mu.Lock()
defer c.mu.Unlock()
var state *GCSafepointSessionState
Expand All @@ -237,7 +263,7 @@ func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error {
c.sessions[s] = state
}
if state.OutstandingCommand {
panic("SessionBeginCommand called on a session that already had an outstanding command.")
panic("SessionCommandBegin called on a session that already had an outstanding command.")
}
// Step #2: Receiving from QuiesceCallbackDone blocks, then
// the callback for this Session is still outstanding. We
Expand All @@ -249,7 +275,7 @@ func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error {
c.mu.Lock()
if state.OutstandingCommand {
// Concurrent calls to SessionCommandBegin. Bad times...
panic("SessionBeginCommand called on a session that already had an outstanding command.")
panic("SessionCommandBegin called on a session that already had an outstanding command.")
}
}
// Step #3. Record that a command is running so that Waiter
Expand All @@ -259,11 +285,23 @@ func (c *GCSafepointController) SessionCommandBegin(s *DoltSession) error {
return nil
}

// Called as part of valctx context validation, this asserts that the
// session is registered with an open command.
func (c *GCSafepointController) Validate(s GCRootsProvider) {
c.mu.Lock()
defer c.mu.Unlock()
if state := c.sessions[s]; state == nil {
panic("GCSafepointController.Validate; expected session with an open command, but no session registered with controller.")
} else if !state.OutstandingCommand {
panic("GCSafepointController.Validate; expected session with an open command, but the registered session has OutstandingCommand == false.")
}
}

// SessionCommandEnd marks the end of a session command. It has for
// effects that the session no longer has an OutstandingCommand and,
// if CommandEndCallback was non-nil, the callback itself has been
// called and the CommandEndCallback field has been reset to |nil|.
func (c *GCSafepointController) SessionCommandEnd(s *DoltSession) {
func (c *GCSafepointController) SessionCommandEnd(s GCRootsProvider) {
c.mu.Lock()
defer c.mu.Unlock()
state := c.sessions[s]
Expand All @@ -284,16 +322,16 @@ func (c *GCSafepointController) SessionCommandEnd(s *DoltSession) {
// if we already knew about it. It is an error to call this on a
// session which currently has an outstanding command.
//
// Because we only register sessions when the BeginCommand, it is
// Because we only register sessions when the CommandBegin, it is
// possible to get a SessionEnd callback for a session that was
// never registered.
//
// This callback does not block for any outstanding |visitQuiescedSession|
// callback to be completed before allowing the session to unregister
// itself. It is an error for the application to call |SessionBeginCommand|
// itself. It is an error for the application to call |SessionCommandBegin|
// on a session after it is has called |SessionEnd| on it, but that error
// is not necessarily detected.
func (c *GCSafepointController) SessionEnd(s *DoltSession) {
func (c *GCSafepointController) SessionEnd(s GCRootsProvider) {
c.mu.Lock()
defer c.mu.Unlock()
state := c.sessions[s]
Expand Down
Loading
Loading