Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/statspro"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/utils/config"
dherrors "github.com/dolthub/dolt/go/libraries/utils/errors"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/valctx"
)
Expand Down Expand Up @@ -94,6 +95,8 @@ type SqlEngineConfig struct {
//
// Intended for embedded-driver use-cases that need to influence dbfactory / storage open behavior.
DBLoadParams map[string]interface{}

FatalBehavior dherrors.FatalBehavior
}

type SqlEngineConfigOption func(*SqlEngineConfig)
Expand Down
14 changes: 13 additions & 1 deletion go/cmd/dolt/commands/engine/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,17 @@ func newDatabase(ctx context.Context, name string, dEnv *env.DoltEnv, useBulkEdi
Deaf: deaf,
Tempdir: tmpDir,
}
return sqle.NewDatabase(ctx, name, dEnv.DbData(ctx), opts)
dbdata := dEnv.DbData(ctx)
// Databases registered with the SQL engine are always
// configured for FatalBehaviorCrash. These are local
// databases and it isn't necessarily safe to continue
// operating after an I/O on the write path. This is in
// contrast to something like a remote in the context of a
// backup, replication or a push, where an I/O error is just
// an error to perform the requested operation.
//
// See also sqle/database_provider.go, where we do this when
// creating new databases as well.
dbdata.Ddb.SetCrashOnFatalError()
return sqle.NewDatabase(ctx, name, dbdata, opts)
}
16 changes: 16 additions & 0 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/utils/earl"
dherrors "github.com/dolthub/dolt/go/libraries/utils/errors"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
Expand Down Expand Up @@ -2030,6 +2031,21 @@ func (ddb *DoltDB) IterateRoots(cb func(root string, timestamp *time.Time) error
}
}

// SetCrashOnFatalError puts the store into a mode where it will
// crash the running process is there is a fatal I/O error which
// prevents Dolt from being able to continue safely while
// continuing to accept writes to this database. This is typically
// the correct behavior for a long-lived process working with a
// local, non-read-only database.
func (ddb *DoltDB) SetCrashOnFatalError() {
cs := datas.ChunkStoreFromDatabase(ddb.db)
if nbs, ok := cs.(interface {
SetFatalBehavior(dherrors.FatalBehavior)
}); ok {
nbs.SetFatalBehavior(dherrors.FatalBehaviorCrash)
}
}

// An approximate representation of how large the on-disk storage is for a DoltDB.
type StoreSizes struct {
// For ChunkJournal stores, this will be size of the journal file. A size
Expand Down
1 change: 1 addition & 0 deletions go/libraries/doltcore/sqle/database_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ func (p *DoltDatabaseProvider) CreateCollatedDatabase(ctx *sql.Context, name str
if err != nil {
return err
}
newEnv.DoltDB(ctx).SetCrashOnFatalError()

updatedCollation, updatedSchemas := false, false

Expand Down
45 changes: 45 additions & 0 deletions go/libraries/utils/errors/panic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package errors

import (
"fmt"
)

type FatalBehavior int

const (
// Returns an error on a fatal error.
FatalBehaviorError FatalBehavior = iota

// Crashes the process immediately and without returning on a fatal error.
FatalBehaviorCrash
)

// Fatalf signals a fatal error, and can be used in situations where the process may
// be entering an unsafe state due to the encountered error. If |behavior| is
// FatalBehaviorCrash, this function will never return. Otherwise, an error value is
// returned, built with fmt.Errorf on |msg| and |args|.
func Fatalf(behavior FatalBehavior, msg string, args ...any) error {
if behavior == FatalBehaviorCrash {
go func() {
panic(fmt.Sprintf("fatal error: "+msg, args...))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this print a stack trace of the calling goroutine? I don't think it does, and it probably should.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah just tested, this gets its own stack trace starting from this stack frame.

You should def capture a stack trace before the goroutine is started.

}()
for {
}
} else {
return fmt.Errorf("fatal error: "+msg, args...)
}
}
5 changes: 3 additions & 2 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

dherrors "github.com/dolthub/dolt/go/libraries/utils/errors"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
Expand Down Expand Up @@ -175,7 +176,7 @@ func (acs archiveChunkSource) currentSize() uint64 {
}

// reader returns a reader for the entire archive file.
func (acs archiveChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
func (acs archiveChunkSource) reader(ctx context.Context, _ dherrors.FatalBehavior) (io.ReadCloser, uint64, error) {
rd, err := acs.aRdr.reader.Reader(ctx)
if err != nil {
return nil, 0, err
Expand All @@ -198,7 +199,7 @@ func (acs archiveChunkSource) clone() (chunkSource, error) {
return archiveChunkSource{reader, acs.file}, nil
}

func (acs archiveChunkSource) getRecordRanges(_ context.Context, records []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ dherrors.FatalBehavior, records []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
result := make(map[hash.Hash]Range, len(records))
for i, req := range records {
if req.found {
Expand Down
13 changes: 7 additions & 6 deletions go/store/nbs/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

dherrors "github.com/dolthub/dolt/go/libraries/utils/errors"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
Expand Down Expand Up @@ -948,11 +949,11 @@ func TestArchiveGetRecordRanges(t *testing.T) {
records = append(records, getRecord{a: &h2[0], prefix: h2[0].Prefix(), found: false})
records = append(records, getRecord{a: &sharedHash, prefix: sharedHash.Prefix(), found: false})

rang1, _, err := src1.getRecordRanges(context.Background(), records, nil)
rang1, _, err := src1.getRecordRanges(context.Background(), dherrors.FatalBehaviorError, records, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(rang1))

rang2, _, err := src2.getRecordRanges(context.Background(), records, nil)
rang2, _, err := src2.getRecordRanges(context.Background(), dherrors.FatalBehaviorError, records, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(rang2))
_, ok := rang2[sharedHash]
Expand All @@ -962,11 +963,11 @@ func TestArchiveGetRecordRanges(t *testing.T) {
for i := range records {
records[i].found = false
}
rang1, _, err = src2.getRecordRanges(context.Background(), records, nil)
rang1, _, err = src2.getRecordRanges(context.Background(), dherrors.FatalBehaviorError, records, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(rang1))

rang2, _, err = src1.getRecordRanges(context.Background(), records, nil)
rang2, _, err = src1.getRecordRanges(context.Background(), dherrors.FatalBehaviorError, records, nil)
assert.NoError(t, err)
assert.Equal(t, 1, len(rang2))
_, ok = rang2[sharedHash]
Expand Down Expand Up @@ -1299,11 +1300,11 @@ func (tcs *testChunkSource) suffix() string {
panic("never used")
}

func (tcs *testChunkSource) reader(ctx context.Context) (io.ReadCloser, uint64, error) {
func (tcs *testChunkSource) reader(ctx context.Context, _ dherrors.FatalBehavior) (io.ReadCloser, uint64, error) {
panic("never used")
}

func (tcs *testChunkSource) getRecordRanges(ctx context.Context, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
func (tcs *testChunkSource) getRecordRanges(ctx context.Context, _ dherrors.FatalBehavior, requests []getRecord, keeper keeperF) (map[hash.Hash]Range, gcBehavior, error) {
panic("never used")
}

Expand Down
19 changes: 10 additions & 9 deletions go/store/nbs/aws_table_persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"

dherrors "github.com/dolthub/dolt/go/libraries/utils/errors"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (s3p awsTablePersister) key(k string) string {
return k
}

func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
func (s3p awsTablePersister) Persist(ctx context.Context, behavior dherrors.FatalBehavior, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
Expand Down Expand Up @@ -230,7 +231,7 @@ func (s partsByPartNum) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
func (s3p awsTablePersister) ConjoinAll(ctx context.Context, behavior dherrors.FatalBehavior, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
plan, err := planRangeCopyConjoin(ctx, sources, s3p.q, stats)
if err != nil {
return nil, nil, err
Expand All @@ -242,7 +243,7 @@ func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}

t1 := time.Now()
err = s3p.executeCompactionPlan(ctx, plan, plan.name.String()+plan.suffix)
err = s3p.executeCompactionPlan(ctx, behavior, plan, plan.name.String()+plan.suffix)
if err != nil {
return nil, nil, err
}
Expand All @@ -260,13 +261,13 @@ func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}
}

func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, plan compactionPlan, key string) error {
func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, behavior dherrors.FatalBehavior, plan compactionPlan, key string) error {
uploadID, err := s3p.startMultipartUpload(ctx, key)
if err != nil {
return err
}

multipartUpload, err := s3p.assembleTable(ctx, plan, key, uploadID)
multipartUpload, err := s3p.assembleTable(ctx, behavior, plan, key, uploadID)
if err != nil {
_ = s3p.abortMultipartUpload(ctx, key, uploadID)
return err
Expand All @@ -275,7 +276,7 @@ func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, plan com
return s3p.completeMultipartUpload(ctx, key, uploadID, multipartUpload)
}

func (s3p awsTablePersister) assembleTable(ctx context.Context, plan compactionPlan, key, uploadID string) (*s3types.CompletedMultipartUpload, error) {
func (s3p awsTablePersister) assembleTable(ctx context.Context, behavior dherrors.FatalBehavior, plan compactionPlan, key, uploadID string) (*s3types.CompletedMultipartUpload, error) {
if len(plan.sources.sws) > maxS3Parts {
return nil, errors.New("exceeded maximum parts")
}
Expand All @@ -301,7 +302,7 @@ func (s3p awsTablePersister) assembleTable(ctx context.Context, plan compactionP
readWg.Add(1)
go func(m manualPart) {
defer readWg.Done()
err := m.readFull(ctx, buff)
err := m.readFull(ctx, behavior, buff)
if err != nil {
ae.SetIfError(fmt.Errorf("failed to read conjoin table data: %w", err))
}
Expand Down Expand Up @@ -438,8 +439,8 @@ type manualPart struct {
start, end int64
}

func (mp manualPart) readFull(ctx context.Context, buff []byte) error {
reader, _, err := mp.src.reader(ctx)
func (mp manualPart) readFull(ctx context.Context, behavior dherrors.FatalBehavior, buff []byte) error {
reader, _, err := mp.src.reader(ctx, behavior)
if err != nil {
return err
}
Expand Down
Loading
Loading