Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/valctx"
)

// SqlEngine packages up the context necessary to run sql queries against dsqle.
type SqlEngine struct {
provider sql.DatabaseProvider
contextFactory contextFactory
ContextFactory sql.ContextFactory
dsessFactory sessionFactory
engine *gms.Engine
fs filesys.Filesys
Expand Down Expand Up @@ -92,6 +93,19 @@ func NewSqlEngine(
mrEnv *env.MultiRepoEnv,
config *SqlEngineConfig,
) (*SqlEngine, error) {
// Context validation is a testing mode that we run Dolt in
// during integration tests. It asserts that `context.Context`
// instances which reach the storage layer have gone through
// GC session lifecycle callbacks. This is only relevant in
// sql mode, so we only enable it here. This is potentially
// relevant in non-sql-server contexts, because things like
// replication and events can still cause concurrency during a
// GC, so we put this here instead of in sql-server.
const contextValidationEnabledEnvVar = "DOLT_CONTEXT_VALIDATION_ENABLED"
if val := os.Getenv(contextValidationEnabledEnvVar); val != "" && val != "0" && strings.ToLower(val) != "false" {
valctx.EnableContextValidation()
}

gcSafepointController := gcctx.NewGCSafepointController()
ctx = gcctx.WithGCSafepointController(ctx, gcSafepointController)

Expand Down Expand Up @@ -230,8 +244,8 @@ func NewSqlEngine(
engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{})
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit)
sqlEngine.provider = pro
sqlEngine.contextFactory = sqlContextFactory
sqlEngine.dsessFactory = sessFactory
sqlEngine.ContextFactory = sqlContextFactory
sqlEngine.engine = engine
sqlEngine.fs = pro.FileSystem()

Expand Down Expand Up @@ -263,7 +277,7 @@ func NewSqlEngine(
}

if engine.EventScheduler == nil {
err = configureEventScheduler(config, engine, sqlEngine.contextFactory, sessFactory, pro)
err = configureEventScheduler(config, engine, sqlEngine.ContextFactory, sessFactory, pro)
if err != nil {
return nil, err
}
Expand All @@ -275,7 +289,7 @@ func NewSqlEngine(
return nil, err
}

err = configureBinlogReplicaController(config, engine, sqlEngine.contextFactory, binLogSession)
err = configureBinlogReplicaController(config, engine, sqlEngine.ContextFactory, binLogSession)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -314,6 +328,9 @@ func (se *SqlEngine) InitStats(ctx context.Context) error {
if err != nil {
return err
}
defer sql.SessionEnd(sqlCtx.Session)
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)
dbs := pro.AllDatabases(sqlCtx)
statsPro := se.GetUnderlyingEngine().Analyzer.Catalog.StatsProvider
if sc, ok := statsPro.(*statspro.StatsController); ok {
Expand All @@ -328,7 +345,7 @@ func (se *SqlEngine) InitStats(ctx context.Context) error {
sqlDbs = append(sqlDbs, db)
}

err = sc.Init(ctx, pro, se.NewDefaultContext, sqlDbs)
err = sc.Init(sqlCtx, pro, se.NewDefaultContext, sqlDbs)
if err != nil {
return err
}
Expand All @@ -355,7 +372,7 @@ func (se *SqlEngine) Databases(ctx *sql.Context) []dsess.SqlDatabase {

// NewContext returns a new sql.Context with the given session.
func (se *SqlEngine) NewContext(ctx context.Context, session sql.Session) (*sql.Context, error) {
return se.contextFactory(ctx, session)
return se.ContextFactory(ctx, sql.WithSession(session)), nil
}

// NewDefaultContext returns a new sql.Context with a new default dolt session.
Expand All @@ -364,7 +381,7 @@ func (se *SqlEngine) NewDefaultContext(ctx context.Context) (*sql.Context, error
if err != nil {
return nil, err
}
return se.contextFactory(ctx, session)
return se.ContextFactory(ctx, sql.WithSession(session)), nil
}

// NewLocalContext returns a new |sql.Context| with its client set to |root|
Expand Down Expand Up @@ -416,11 +433,8 @@ func (se *SqlEngine) Close() error {
}

// configureBinlogReplicaController configures the binlog replication controller with the |engine|.
func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engine, ctxFactory contextFactory, session *dsess.DoltSession) error {
executionCtx, err := ctxFactory(context.Background(), session)
if err != nil {
return err
}
func configureBinlogReplicaController(config *SqlEngineConfig, engine *gms.Engine, ctxFactory sql.ContextFactory, session *dsess.DoltSession) error {
executionCtx := ctxFactory(context.Background(), sql.WithSession(session))
dblr.DoltBinlogReplicaController.SetExecutionContext(executionCtx)
dblr.DoltBinlogReplicaController.SetEngine(engine)
engine.Analyzer.Catalog.BinlogReplicaController = config.BinlogReplicaController
Expand All @@ -442,14 +456,14 @@ func configureBinlogPrimaryController(engine *gms.Engine) error {

// configureEventScheduler configures the event scheduler with the |engine| for executing events, a |sessFactory|
// for creating sessions, and a DoltDatabaseProvider, |pro|.
func configureEventScheduler(config *SqlEngineConfig, engine *gms.Engine, ctxFactory contextFactory, sessFactory sessionFactory, pro *dsqle.DoltDatabaseProvider) error {
func configureEventScheduler(config *SqlEngineConfig, engine *gms.Engine, ctxFactory sql.ContextFactory, sessFactory sessionFactory, pro *dsqle.DoltDatabaseProvider) error {
// getCtxFunc is used to create new session with a new context for event scheduler.
getCtxFunc := func() (*sql.Context, error) {
sess, err := sessFactory(sql.NewBaseSession(), pro)
if err != nil {
return nil, err
}
return ctxFactory(context.Background(), sess)
return ctxFactory(context.Background(), sql.WithSession(sess)), nil
}

// A hidden env var allows overriding the event scheduler period for testing. This option is not
Expand All @@ -471,9 +485,13 @@ func configureEventScheduler(config *SqlEngineConfig, engine *gms.Engine, ctxFac
}

// sqlContextFactory returns a contextFactory that creates a new sql.Context with the given session
func sqlContextFactory(ctx context.Context, session sql.Session) (*sql.Context, error) {
sqlCtx := sql.NewContext(ctx, sql.WithSession(session))
return sqlCtx, nil
func sqlContextFactory(ctx context.Context, opts ...sql.ContextOption) *sql.Context {
ctx = valctx.WithContextValidation(ctx)
sqlCtx := sql.NewContext(ctx, opts...)
if sqlCtx.Session != nil {
valctx.SetContextValidation(ctx, dsess.DSessFromSess(sqlCtx.Session).Validate)
}
return sqlCtx
}

// doltSessionFactory returns a sessionFactory that creates a new DoltSession
Expand Down Expand Up @@ -521,6 +539,7 @@ func NewSqlEngineForEnv(ctx context.Context, dEnv *env.DoltEnv, options ...Confi
if err != nil {
return nil, "", err
}

if err := engine.InitStats(ctx); err != nil {
return nil, "", err
}
Expand Down
8 changes: 4 additions & 4 deletions go/cmd/dolt/commands/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (cmd ShowCmd) Exec(ctx context.Context, commandStr string, args []string, d
return 1
}

if !opts.pretty && !dEnv.DoltDB(ctx).Format().UsesFlatbuffers() {
if !opts.pretty && !dEnv.DoltDB(sqlCtx).Format().UsesFlatbuffers() {
cli.PrintErrln("`dolt show --no-pretty` or `dolt show (BRANCHNAME)` is not supported when using old LD_1 storage format.")
return 1
}
Expand All @@ -180,7 +180,7 @@ func (cmd ShowCmd) Exec(ctx context.Context, commandStr string, args []string, d
for _, specRef := range resolvedRefs {
// If --no-pretty was supplied, always display the raw contents of the referenced object.
if !opts.pretty {
err := printRawValue(ctx, dEnv, specRef)
err := printRawValue(sqlCtx, dEnv, specRef)
if err != nil {
return handleErrAndExit(err)
}
Expand All @@ -202,12 +202,12 @@ func (cmd ShowCmd) Exec(ctx context.Context, commandStr string, args []string, d
cli.PrintErrln("`dolt show (NON_COMMIT_HASH)` requires a local environment. Not intended for common use.")
return 1
}
if !dEnv.DoltDB(ctx).Format().UsesFlatbuffers() {
if !dEnv.DoltDB(sqlCtx).Format().UsesFlatbuffers() {
cli.PrintErrln("`dolt show (NON_COMMIT_HASH)` is not supported when using old LD_1 storage format.")
return 1
}

value, err := getValueFromRefSpec(ctx, dEnv, specRef)
value, err := getValueFromRefSpec(sqlCtx, dEnv, specRef)
if err != nil {
err = fmt.Errorf("error resolving spec ref '%s': %w", specRef, err)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion go/cmd/dolt/commands/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,10 @@ func execShell(sqlCtx *sql.Context, qryist cli.Queryist, format engine.PrintResu
subCtx, stop := signal.NotifyContext(initialCtx, os.Interrupt, syscall.SIGTERM)
defer stop()

sqlCtx := sql.NewContext(subCtx, sql.WithSession(sqlCtx.Session))
var cancel func()
sqlCtx, cancel = sqlCtx.NewSubContext()
stopAfter := context.AfterFunc(subCtx, cancel)
defer stopAfter()

cmdType, subCmd, newQuery, err := preprocessQuery(query, lastSqlCmd, cliCtx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func ConfigureServices(
mySQLServer, err = server.NewServerWithHandler(
serverConf,
sqlEngine.GetUnderlyingEngine(),
sql.NewContext,
sqlEngine.ContextFactory,
newSessionBuilder(sqlEngine, cfg.ServerConfig),
metListener,
func(h mysql.Handler) (mysql.Handler, error) {
Expand All @@ -732,7 +732,7 @@ func ConfigureServices(
mySQLServer, err = server.NewServer(
serverConf,
sqlEngine.GetUnderlyingEngine(),
sql.NewContext,
sqlEngine.ContextFactory,
newSessionBuilder(sqlEngine, cfg.ServerConfig),
metListener,
)
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/dolt/commands/tblcmds/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (cmd ExportCmd) Exec(ctx context.Context, commandStr string, args []string,
return commands.HandleVErrAndExitCode(errhand.BuildDError("Error creating reader for %s.", exOpts.SrcName()).AddCause(err).Build(), usage)
}

wr, verr := getTableWriter(ctx, root, dEnv, rd.GetSchema(), exOpts)
wr, verr := getTableWriter(sqlCtx, root, dEnv, rd.GetSchema(), exOpts)
if verr != nil {
return commands.HandleVErrAndExitCode(verr, usage)
}
Expand Down
41 changes: 27 additions & 14 deletions go/cmd/dolt/commands/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,37 +246,30 @@ func newLateBindingEngine(
Autocommit: true,
}

var lateBinder cli.LateBindQueryist = func(ctx2 context.Context) (cli.Queryist, *sql.Context, func(), error) {
var lateBinder cli.LateBindQueryist = func(ctx context.Context) (cli.Queryist, *sql.Context, func(), error) {
// We've deferred loading the database as long as we can.
// If we're binding the Queryist, that means that engine is actually
// going to be used.
mrEnv.ReloadDBs(ctx2)
mrEnv.ReloadDBs(ctx)

se, err := engine.NewSqlEngine(
ctx2,
ctx,
mrEnv,
config,
)
if err != nil {
return nil, nil, nil, err
}

if err := se.InitStats(ctx2); err != nil {
if err := se.InitStats(ctx); err != nil {
se.Close()
return nil, nil, nil, err
}

sqlCtx, err := se.NewDefaultContext(ctx2)
if err != nil {
return nil, nil, nil, err
}

// Whether we're running in shell mode or some other mode, sql commands from the command line always have a current
// database set when you begin using them.
sqlCtx.SetCurrentDatabase(database)

rawDb := se.GetUnderlyingEngine().Analyzer.Catalog.MySQLDb
salt, err := mysql.NewSalt()
if err != nil {
se.Close()
return nil, nil, nil, err
}

Expand All @@ -292,6 +285,7 @@ func newLateBindingEngine(

err := passwordValidate(rawDb, salt, dbUser, authResponse)
if err != nil {
se.Close()
return nil, nil, nil, err
}

Expand All @@ -303,9 +297,28 @@ func newLateBindingEngine(
rawDb.AddEphemeralSuperUser(ed, dbUser, config.ServerHost, "")
}

sqlCtx, err := se.NewDefaultContext(ctx)
if err != nil {
se.Close()
return nil, nil, nil, err
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this one need se.Close()? should we defer check if err != nil for closing every error retur?

}
// Whether we're running in shell mode or some other mode, sql commands from the command line always have a current
// database set when you begin using them.
sqlCtx.SetCurrentDatabase(database)

// For now, we treat the entire lifecycle of this
// sqlCtx as one big session-in-use window.
sql.SessionCommandBegin(sqlCtx.Session)

close := func() {
sql.SessionCommandEnd(sqlCtx.Session)
sql.SessionEnd(sqlCtx.Session)
se.Close()
}

// Set client to specified user
sqlCtx.Session.SetClient(sql.Client{User: dbUser, Address: config.ServerHost, Capabilities: 0})
return se, sqlCtx, func() { se.Close() }, nil
return se, sqlCtx, close, nil
}

return lateBinder, nil
Expand Down
3 changes: 3 additions & 0 deletions go/libraries/doltcore/doltdb/gcctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gcctx
import (
"context"

"github.com/dolthub/dolt/go/libraries/utils/valctx"
"github.com/dolthub/dolt/go/store/hash"
)

Expand Down Expand Up @@ -48,6 +49,8 @@ func WithGCSafepointController(ctx context.Context, controller *GCSafepointContr
controller: controller,
}
ret := context.WithValue(ctx, safepointControllerkey, state)
ret = valctx.WithContextValidation(ret)
valctx.SetContextValidation(ret, state.Validate)
return ret
}

Expand Down
4 changes: 4 additions & 0 deletions go/libraries/doltcore/sqle/database_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/concurrentmap"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/lockutil"
"github.com/dolthub/dolt/go/libraries/utils/valctx"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
)
Expand Down Expand Up @@ -136,6 +137,9 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files
for _, esp := range dprocedures.DoltProcedures {
externalProcedures.Register(esp)
}
if valctx.IsEnabled() {
externalProcedures.Register(dprocedures.NewTestValctxProcedure())
}

// If the specified |fs| is an in mem file system, default to using the InMemDoltDB dbFactoryUrl so that all
// databases are created with the same file system type.
Expand Down
Loading
Loading