Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/store/nbs/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu
return err
}

_, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb)
_, err = j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb)
if err != nil {
return err
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu
}

// parse existing journal file
root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb)
root, err := j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb)
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions go/store/nbs/journal_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,16 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
defer lock.Unlock()

journalPath := filepath.Join(nomsDir, chunkJournalName)
jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666)
journalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666)
if err != nil {
return "", fmt.Errorf("could not open chunk journal file: %w", err)
}
defer jornalFile.Close()
defer journalFile.Close()

noOp := func(o int64, r journalRec) error { return nil }
// First verify that the journal has data loss.
var offset int64
offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil)
offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil)
if err == nil {
// No data loss detected, nothing to do.
return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed")
Expand All @@ -283,7 +283,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
}

// Seek back to the start, to perform a full copy.
if _, err = jornalFile.Seek(0, io.SeekStart); err != nil {
if _, err = journalFile.Seek(0, io.SeekStart); err != nil {
return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err)
}

Expand All @@ -296,7 +296,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
if err != nil {
return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err)
}
if _, err = io.Copy(saveFile, jornalFile); err != nil {
if _, err = io.Copy(saveFile, journalFile); err != nil {
return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err)
}

Expand All @@ -305,10 +305,10 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
}

// Now truncate the journal file to the last known good offset.
if err = jornalFile.Truncate(offset); err != nil {
if err = journalFile.Truncate(offset); err != nil {
return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err)
}
if err = jornalFile.Sync(); err != nil {
if err = journalFile.Sync(); err != nil {
return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err)
}

Expand All @@ -327,7 +327,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error)
//
// The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller
// to handle the situation in a context specific way.
func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) {
func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) {
var (
buf []byte
err error
Expand Down Expand Up @@ -444,7 +444,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f
// When we have a real file, we truncate anything which is beyond the current offset. Historically we put
// null bytes there, and there have been cases of garbage data being present instead of nulls. If there is any
// data beyond the current offset which we can parse and looks like data loss, we would have errored out above.
if f, ok := r.(*os.File); ok {
if f, ok := r.(*os.File); ok && tryTruncate {
err = f.Truncate(off)
if err != nil {
return 0, err
Expand Down
12 changes: 6 additions & 6 deletions go/store/nbs/journal_record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) {
}

var recoverErr error
n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e })
n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e })
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a test where we pass false and watch it do the right thing?

assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
require.NoError(t, err)
Expand All @@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) {
// write a bogus record to the end and verify that we don't get an error
i, sum = 0, 0
writeCorruptJournalRecord(journal[off:])
n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e })
n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e })
require.NoError(t, err)
assert.Equal(t, cnt, i)
assert.Equal(t, int(off), int(n))
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) {
}

var recoverErr error
_, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), 0, check, func(e error) { recoverErr = e })
_, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e })

if td.lossExpected {
require.Error(t, err)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) {
// no confidence in the rest of the test.
ctx := context.Background()
var recoverErr error
bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e })
bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e })
require.NoError(t, err)
require.Equal(t, off, uint32(bytesRead))
require.Error(t, recoverErr) // We do expect a warning here, but no data loss.
Expand All @@ -312,7 +312,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) {
// Copy lost data into journal buffer at the test offset.
copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData)

_, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e })
_, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e })
require.Error(t, err)
require.True(t, errors.Is(err, ErrJournalDataLoss))
require.Error(t, recoverErr)
Expand Down Expand Up @@ -457,7 +457,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec
t.FailNow()
}

_, err := processJournalRecords(ctx, bytes.NewReader(journalData), 0, func(offset int64, rec journalRec) error {
_, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error {
records = append(records, testRecord{hash: rec.address, kind: rec.kind})
return nil
}, warnCb)
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/journal_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ io.Closer = &journalWriter{}
// are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we
// extend the journal index with one metadata flush before existing this function to save indexing
// progress.
func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) {
func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) {
wr.lock.Lock()
defer wr.lock.Unlock()

Expand Down Expand Up @@ -276,7 +276,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
// process the non-indexed portion of the journal starting at |wr.indexed|,
// at minimum the non-indexed portion will include a root hash record.
// Index lookups are added to the ongoing batch to re-synchronize.
wr.off, err = processJournalRecords(ctx, wr.journal, wr.indexed, func(o int64, r journalRec) error {
wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error {
switch r.kind {
case chunkJournalRecKind:
rng := Range{
Expand Down
8 changes: 4 additions & 4 deletions go/store/nbs/journal_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter {
j, err := createJournalWriter(ctx, path)
require.NoError(t, err)
require.NotNil(t, j)
_, err = j.bootstrapJournal(ctx, nil, nil)
_, err = j.bootstrapJournal(ctx, true, nil, nil)
require.NoError(t, err)
return j
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) {
j, _, err := openJournalWriter(ctx, path)
require.NoError(t, err)
reflogBuffer := newReflogRingBuffer(10)
last, err = j.bootstrapJournal(ctx, reflogBuffer, nil)
last, err = j.bootstrapJournal(ctx, true, reflogBuffer, nil)
require.NoError(t, err)
assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()})

Expand Down Expand Up @@ -363,7 +363,7 @@ func TestJournalIndexBootstrap(t *testing.T) {
require.NoError(t, err)
require.True(t, ok)
// bootstrap journal and validate chunk records
last, err := journal.bootstrapJournal(ctx, nil, nil)
last, err := journal.bootstrapJournal(ctx, true, nil, nil)
assert.NoError(t, err)
for _, e := range expected {
var act CompressedChunk
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestJournalIndexBootstrap(t *testing.T) {
jnl, ok, err := openJournalWriter(ctx, idxPath)
require.NoError(t, err)
require.True(t, ok)
_, err = jnl.bootstrapJournal(ctx, nil, nil)
_, err = jnl.bootstrapJournal(ctx, true, nil, nil)
assert.Error(t, err)
})
}
Expand Down
173 changes: 173 additions & 0 deletions integration-tests/go-sql-server-driver/repro_10331_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright 2026 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
// "context"
// "database/sql"
// sqldriver "database/sql/driver"
// "fmt"
// "strings"
"crypto/rand"
"encoding/base64"
"fmt"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

// "time"

// "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

// "golang.org/x/sync/errgroup"

driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
)

// TestRegression10331 tests for the checksum error caused by concurrent executions of `dolt sql`
// while dolt sql-server is processing writes.
//
// https://github.com/dolthub/dolt/issues/10331
func TestRegression10331(t *testing.T) {
t.Parallel()
// This test is not 100% reliable at detecting errors quickly, so we run it a few times.
const testCount = 4
for i := range testCount {
t.Run(strconv.Itoa(i), func(t *testing.T) {
var ports DynamicResources
ports.global = &GlobalPorts
ports.t = t
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})

rs, err := u.MakeRepoStore()
require.NoError(t, err)

repo, err := rs.MakeRepo("regression_10331_test")
require.NoError(t, err)

srvSettings := &driver.Server{
Args: []string{"-P", `{{get_port "server_port"}}`},
DynamicPort: "server_port",
}
server := MakeServer(t, repo, srvSettings, &ports)
server.DBName = "regression_10331_test"

db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
defer db.Close()

ctx := t.Context()

func() {
conn, err := db.Conn(ctx)
require.NoError(t, err)
defer conn.Close()

// Create table and initial data.
_, err = conn.ExecContext(ctx, "CREATE TABLE data (id INT PRIMARY KEY AUTO_INCREMENT, val LONGTEXT)")
require.NoError(t, err)
for i := 0; i <= 50; i++ {
var bs [10240]byte
rand.Read(bs[:])
data := base64.StdEncoding.EncodeToString(bs[:])
_, err = conn.ExecContext(ctx, "INSERT INTO data (val) VALUES (?)", data)
require.NoError(t, err)
}
_, err = conn.ExecContext(ctx, "CALL DOLT_COMMIT('-Am', 'init with data')")
require.NoError(t, err)
}()

eg, ctx := errgroup.WithContext(ctx)
start := time.Now()

var successfulWrites, successfulReads int32
var errWrites, errReads int32
const numWriters = 8
const numReaders = 16
const testDuration = 8 * time.Second
for i := range numWriters {
eg.Go(func() error {
var bs [1024]byte
rand.Read(bs[:])
data := base64.StdEncoding.EncodeToString(bs[:])
j := 0
for {
if time.Since(start) > testDuration {
return nil
}
if ctx.Err() != nil {
return nil
}
out, err := func() (string, error) {
cmd := repo.DoltCmd("sql", "-r", "csv", "-q", fmt.Sprintf("INSERT INTO data (val) VALUES ('%s'); CALL DOLT_COMMIT('--allow-empty', '-Am', 'w%d c%d')", data, i, j))
out, err := cmd.CombinedOutput()
return string(out), err
}()
if err != nil {
t.Logf("error writing, %v", err)
atomic.AddInt32(&errWrites, 1)
if strings.Contains(out, "checksum") || strings.Contains(out, " EOF") {
return fmt.Errorf("error writing values %d: %s, %w", i, out, err)
}
} else {
atomic.AddInt32(&successfulWrites, 1)
}
j += 1
}
})
}
for i := range numReaders {
eg.Go(func() error {
j := 0
for {
if time.Since(start) > testDuration {
return nil
}
if ctx.Err() != nil {
return nil
}
out, err := func() (string, error) {
cmd := repo.DoltCmd("sql", "-r", "csv", "-q", "SELECT COUNT(*) FROM data; SELECT * FROM dolt_log LIMIT 20")
out, err := cmd.CombinedOutput()
return string(out), err
}()
if err != nil {
t.Logf("error reading, %v", err)
atomic.AddInt32(&errReads, 1)
if strings.Contains(out, "checksum") || strings.Contains(out, " EOF") {
return fmt.Errorf("READER error %d: %s, %w", i, out, err)
}
} else {
atomic.AddInt32(&successfulReads, 1)
}
j += 1
}
})
}

require.NoError(t, eg.Wait())
t.Logf("err writes: %d, err reads: %d", errWrites, errReads)
t.Logf("successful writes: %d, successful reads: %d", successfulWrites, successfulReads)
})
}
}
Loading