Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package engine

import (
"context"
"maps"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -87,6 +88,12 @@ type SqlEngineConfig struct {
EventSchedulerStatus eventscheduler.SchedulerStatus
BranchActivityTracking bool
EngineOverrides sql.EngineOverrides

// DBLoadParams are optional parameters passed through to database loading for local file-backed databases.
// These are merged into the params map used by doltdb/env load routines.
//
// Intended for embedded-driver use-cases that need to influence dbfactory / storage open behavior.
DBLoadParams map[string]interface{}
}

type SqlEngineConfigOption func(*SqlEngineConfig)
Expand Down Expand Up @@ -117,6 +124,22 @@ func NewSqlEngine(
gcctx.SessionCommandBegin(ctx)
defer gcctx.SessionCommandEnd(ctx)

// Thread DB load params into each environment before any DB is loaded.
// (For already-loaded envs, these will not affect the existing instance.)
if config != nil && len(config.DBLoadParams) > 0 {
_ = mrEnv.Iter(func(_ string, dEnv *env.DoltEnv) (stop bool, err error) {
if dEnv == nil {
return false, nil
}
if dEnv.DBLoadParams == nil {
dEnv.DBLoadParams = maps.Clone(config.DBLoadParams)
return false, nil
}
maps.Copy(dEnv.DBLoadParams, config.DBLoadParams)
return false, nil
})
}

dbs, locations, err := CollectDBs(ctx, mrEnv, config.Bulk)
if err != nil {
return nil, err
Expand Down
31 changes: 30 additions & 1 deletion go/libraries/doltcore/dbfactory/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ const (
DatabaseNameParam = "database_name"

MMapArchiveIndexesParam = "mmap_archive_indexes"

// DisableSingletonCacheParam disables the in-process singleton database cache for local file-backed databases.
// When set, each open will construct a fresh underlying store instead of reusing a cached instance.
//
// Intended for embedded-driver usage where callers want deterministic reopen semantics.
DisableSingletonCacheParam = "disable_singleton_cache"

// FailOnJournalLockTimeoutParam changes the journaling store open behavior to fail fast when the exclusive
// journal manifest lock cannot be acquired within Dolt's internal lock timeout, instead of falling back
// to opening the database in read-only mode.
//
// Intended for embedded-driver usage so higher layers can implement their own retry/backoff policy.
FailOnJournalLockTimeoutParam = "fail_on_journal_lock_timeout"
)

// DoltDataDir is the directory where noms files will be stored
Expand Down Expand Up @@ -128,6 +141,14 @@ func (fact FileFactory) PrepareDB(ctx context.Context, nbf *types.NomsBinFormat,

// CreateDB creates a local filesys backed database
func (fact FileFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]interface{}) (datas.Database, types.ValueReadWriter, tree.NodeStore, error) {
// Some embedded-driver use-cases require deterministic reopen semantics. When this flag is set,
// bypass the in-process singleton cache and always create a new underlying store.
if params != nil {
if _, ok := params[DisableSingletonCacheParam]; ok {
return fact.CreateDbNoCache(ctx, nbf, urlObj, params, nbs.JournalParserLoggingWarningsCb)
}
}

singletonLock.Lock()
defer singletonLock.Unlock()

Expand Down Expand Up @@ -179,7 +200,15 @@ func (fact FileFactory) CreateDbNoCache(ctx context.Context, nbf *types.NomsBinF
var newGenSt *nbs.NomsBlockStore
q := nbs.NewUnlimitedMemQuotaProvider()
if useJournal && chunkJournalFeatureFlag {
newGenSt, err = nbs.NewLocalJournalingStore(ctx, nbf.VersionString(), path, q, mmapArchiveIndexes, recCb)
// Allow higher layers (e.g. embedded driver) to opt into fail-fast lock behavior instead of
// falling back to read-only mode on lock timeout.
opts := nbs.JournalingStoreOptions{}
if params != nil {
if _, ok := params[FailOnJournalLockTimeoutParam]; ok {
opts.FailOnLockTimeout = true
}
}
newGenSt, err = nbs.NewLocalJournalingStoreWithOptions(ctx, nbf.VersionString(), path, q, mmapArchiveIndexes, recCb, opts)
} else {
newGenSt, err = nbs.NewLocalStore(ctx, nbf.VersionString(), path, defaultMemTableSize, q, mmapArchiveIndexes)
}
Expand Down
98 changes: 98 additions & 0 deletions go/libraries/doltcore/dbfactory/file_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbfactory

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"

"github.com/dolthub/dolt/go/libraries/utils/earl"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/types"
)

func TestFileFactory_CreateDB_SingletonCacheAndBypass(t *testing.T) {
ctx := context.Background()
nbf := types.Format_Default

t.Run("default uses singleton cache", func(t *testing.T) {
root := t.TempDir()
nomsDir := filepath.Join(root, "noms")
require.NoError(t, os.MkdirAll(nomsDir, 0o755))

urlStr := earl.FileUrlFromPath(filepath.ToSlash(nomsDir), os.PathSeparator)

db1, _, _, err := CreateDB(ctx, nbf, urlStr, map[string]interface{}{ChunkJournalParam: struct{}{}})
require.NoError(t, err)
t.Cleanup(func() { _ = CloseAllLocalDatabases() })

db2, _, _, err := CreateDB(ctx, nbf, urlStr, map[string]interface{}{ChunkJournalParam: struct{}{}})
require.NoError(t, err)

require.True(t, db1 == db2, "expected singleton cache to return same DB instance")
})

t.Run("DisableSingletonCacheParam bypasses singleton cache", func(t *testing.T) {
root := t.TempDir()
nomsDir := filepath.Join(root, "noms")
require.NoError(t, os.MkdirAll(nomsDir, 0o755))

urlStr := earl.FileUrlFromPath(filepath.ToSlash(nomsDir), os.PathSeparator)
params := map[string]interface{}{
ChunkJournalParam: struct{}{},
DisableSingletonCacheParam: struct{}{},
}

db1, _, _, err := CreateDB(ctx, nbf, urlStr, params)
require.NoError(t, err)
t.Cleanup(func() { _ = db1.Close() })

db2, _, _, err := CreateDB(ctx, nbf, urlStr, params)
require.NoError(t, err)
t.Cleanup(func() { _ = db2.Close() })

require.True(t, db1 != db2, "expected bypass mode to return different DB instances")
})
}

func TestFileFactory_CreateDB_FailOnJournalLockTimeoutParam(t *testing.T) {
ctx := context.Background()
nbf := types.Format_Default

root := t.TempDir()
nomsDir := filepath.Join(root, "noms")
require.NoError(t, os.MkdirAll(nomsDir, 0o755))

urlStr := earl.FileUrlFromPath(filepath.ToSlash(nomsDir), os.PathSeparator)
params := map[string]interface{}{
ChunkJournalParam: struct{}{},
DisableSingletonCacheParam: struct{}{},
FailOnJournalLockTimeoutParam: struct{}{},
}

// First open takes the journal manifest lock and holds it until closed.
db1, _, _, err := CreateDB(ctx, nbf, urlStr, params)
require.NoError(t, err)
t.Cleanup(func() { _ = db1.Close() })

// Second open should fail fast when the fail-on-timeout flag is honored.
_, _, _, err = CreateDB(ctx, nbf, urlStr, params)
require.Error(t, err)
require.ErrorIs(t, err, nbs.ErrDatabaseLocked)
}
25 changes: 25 additions & 0 deletions go/libraries/doltcore/env/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"path/filepath"
"strings"
"sync"
Expand Down Expand Up @@ -97,6 +98,11 @@ type DoltEnv struct {
urlStr string
hdp HomeDirProvider

// DBLoadParams are optional parameters passed through to doltdb.LoadDoltDBWithParams when this environment
// loads its underlying DoltDB. This allows higher layers (e.g. SQL engine / embedded driver) to influence
// dbfactory / storage open behavior.
DBLoadParams map[string]interface{}

UserPassConfig *creds.DoltCredsForPass
}

Expand All @@ -121,6 +127,15 @@ func (dEnv *DoltEnv) DoltDB(ctx context.Context) *doltdb.DoltDB {

func (dEnv *DoltEnv) LoadDoltDBWithParams(ctx context.Context, nbf *types.NomsBinFormat, urlStr string, fs filesys.Filesys, params map[string]interface{}) error {
if dEnv.doltDB == nil {
// Merge any environment-level DB load params without mutating the caller's map.
if len(dEnv.DBLoadParams) > 0 {
if params == nil {
params = maps.Clone(dEnv.DBLoadParams)
} else {
params = maps.Clone(params)
maps.Copy(params, dEnv.DBLoadParams)
}
}
ddb, err := doltdb.LoadDoltDBWithParams(ctx, types.Format_Default, urlStr, fs, params)
if err != nil {
return err
Expand Down Expand Up @@ -223,6 +238,16 @@ func LoadDoltDB(ctx context.Context, fs filesys.Filesys, urlStr string, dEnv *Do
if mmapArchiveIndexes {
params = map[string]interface{}{dbfactory.MMapArchiveIndexesParam: struct{}{}}
}

// Merge any environment-level DB load params.
if len(dEnv.DBLoadParams) > 0 {
if params == nil {
params = make(map[string]interface{}, len(dEnv.DBLoadParams))
}
for k, v := range dEnv.DBLoadParams {
params[k] = v
}
}
ddb, dbLoadErr := doltdb.LoadDoltDBWithParams(ctx, types.Format_Default, urlStr, fs, params)
dEnv.doltDB = ddb
dEnv.DBLoadError = dbLoadErr
Expand Down
32 changes: 28 additions & 4 deletions go/libraries/doltcore/env/multi_repo_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package env
import (
"context"
"errors"
"maps"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -146,6 +147,16 @@ func MultiEnvForDirectory(
var dbName string = "dolt"
var newDEnv *DoltEnv = dEnv

// Use the process user's home dir provider for loading local filesystem envs. Some tests use a synthetic
// HomeDirProvider for in-memory envs (e.g. "/user/bheni"), which must not leak into LocalFS loads.
hdp := GetCurrentUserHomeDir

// Capture any caller-provided DB load params so we can apply them to newly created envs.
var dbLoadParams map[string]interface{}
if dEnv != nil && len(dEnv.DBLoadParams) > 0 {
dbLoadParams = maps.Clone(dEnv.DBLoadParams)
}

// InMemFS is used only for testing.
// All other FS Types should get a newly created Environment which will serve as the primary env in the MultiRepoEnv
if _, ok := dataDirFS.(*filesys.InMemFS); !ok {
Expand All @@ -156,7 +167,17 @@ func MultiEnvForDirectory(
envName := getRepoRootDir(path, string(os.PathSeparator))
dbName = dbfactory.DirToDBName(envName)

newDEnv = Load(ctx, GetCurrentUserHomeDir, dataDirFS, doltdb.LocalDirDoltDB, version)
// Always create a fresh environment rooted at dataDirFS. The invoking dEnv is not guaranteed to be a
// local filesystem env (it may be remote), so reusing it can break DB loading and dial provider setup.
newDEnv = LoadWithoutDB(ctx, hdp, dataDirFS, doltdb.LocalDirDoltDB, version)

// Apply any caller-provided DB load params before we load the DB so they affect dbfactory/storage open.
if len(dbLoadParams) > 0 {
newDEnv.DBLoadParams = maps.Clone(dbLoadParams)
}

// Preserve historical behavior: eagerly load the primary DB here.
LoadDoltDB(ctx, newDEnv.FS, newDEnv.urlStr, newDEnv)
}

mrEnv := &MultiRepoEnv{
Expand Down Expand Up @@ -199,12 +220,15 @@ func MultiEnvForDirectory(
}

// TODO: get rid of version altogether
version := ""
subVersion := ""
if dEnv != nil {
version = dEnv.Version
subVersion = dEnv.Version
}

newEnv := LoadWithoutDB(ctx, GetCurrentUserHomeDir, newFs, doltdb.LocalDirDoltDB, version)
newEnv := LoadWithoutDB(ctx, hdp, newFs, doltdb.LocalDirDoltDB, subVersion)
if len(dbLoadParams) > 0 {
newEnv.DBLoadParams = maps.Clone(dbLoadParams)
}
if newEnv.Valid() {
envSet[dbfactory.DirToDBName(dir)] = newEnv
} else {
Expand Down
11 changes: 10 additions & 1 deletion go/store/nbs/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ const (
chunkJournalName = chunkJournalAddr // todo
)

// ErrDatabaseLocked indicates the database is currently locked by another Dolt process.
// This is returned when callers opt into fail-fast lock behavior for embedded usage.
var ErrDatabaseLocked = errors.New("the database is locked by another dolt process")

// reflogDisabled indicates whether access to the reflog has been disabled and if so, no chunk journal root references
// should be kept in memory. This is controlled by the DOLT_DISABLE_REFLOG env var and this var is ONLY written to
// during initialization. All access after initialization is read-only, so no additional locking is needed.
Expand Down Expand Up @@ -544,12 +548,17 @@ func (c journalConjoiner) chooseConjoinees(upstream []tableSpec) (conjoinees, ke
}

// newJournalManifest makes a new file manifest.
func newJournalManifest(ctx context.Context, dir string) (m *journalManifest, err error) {
// When failOnTimeout is true, callers want a hard error instead of falling back to read-only mode.
// (The behavior change is implemented separately; this is the plumbing flag.)
func newJournalManifest(ctx context.Context, dir string, failOnTimeout bool) (m *journalManifest, err error) {
lock := fslock.New(filepath.Join(dir, lockFileName))
// try to take the file lock. if we fail, make the manifest read-only.
// if we succeed, hold the file lock until we close the journalManifest
err = lock.LockWithTimeout(lockFileTimeout)
if errors.Is(err, fslock.ErrTimeout) {
if failOnTimeout {
return nil, ErrDatabaseLocked
}
lock, err = nil, nil // read only
} else if err != nil {
return nil, err
Expand Down
15 changes: 13 additions & 2 deletions go/store/nbs/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func makeTestChunkJournal(t *testing.T) *ChunkJournal {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
t.Cleanup(func() { file.RemoveAll(dir) })
m, err := newJournalManifest(ctx, dir)
m, err := newJournalManifest(ctx, dir, false)
require.NoError(t, err)
q := NewUnlimitedMemQuotaProvider()
p := newFSTablePersister(dir, q, false)
Expand All @@ -47,7 +47,7 @@ func makeTestChunkJournal(t *testing.T) *ChunkJournal {
}

func openTestChunkJournal(t *testing.T, dir string) *ChunkJournal {
m, err := newJournalManifest(t.Context(), dir)
m, err := newJournalManifest(t.Context(), dir, false)
require.NoError(t, err)
q := NewUnlimitedMemQuotaProvider()
p := newFSTablePersister(dir, q, false)
Expand Down Expand Up @@ -96,6 +96,17 @@ func TestChunkJournalReadOnly(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, rosource)
})

t.Run("FailOnLockTimeoutReturnsErrDatabaseLocked", func(t *testing.T) {
// A rw journal holds the journalManifest lock. With failOnTimeout enabled, a concurrent open should
// return an error instead of falling back to read-only.
rw := makeTestChunkJournal(t)
assert.Equal(t, chunks.ExclusiveAccessMode(chunks.ExclusiveAccessMode_Exclusive), rw.AccessMode())

_, err := newJournalManifest(t.Context(), rw.backing.dir, true)
require.Error(t, err)
require.ErrorIs(t, err, ErrDatabaseLocked)
})
}

func TestChunkJournalPersist(t *testing.T) {
Expand Down
Loading
Loading