diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index c405558b43a..ab3dd860dfd 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -155,7 +155,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu return err } - _, err = j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb) + _, err = j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb) if err != nil { return err } @@ -183,7 +183,7 @@ func (j *ChunkJournal) bootstrapJournalWriter(ctx context.Context, warningsCb fu } // parse existing journal file - root, err := j.wr.bootstrapJournal(ctx, j.reflogRingBuffer, warningsCb) + root, err := j.wr.bootstrapJournal(ctx, canCreate, j.reflogRingBuffer, warningsCb) if err != nil { return err } diff --git a/go/store/nbs/journal_record.go b/go/store/nbs/journal_record.go index 8b0d683d7ca..d724ba3549d 100644 --- a/go/store/nbs/journal_record.go +++ b/go/store/nbs/journal_record.go @@ -264,16 +264,16 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) defer lock.Unlock() journalPath := filepath.Join(nomsDir, chunkJournalName) - jornalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) + journalFile, err := os.OpenFile(journalPath, os.O_RDWR, 0666) if err != nil { return "", fmt.Errorf("could not open chunk journal file: %w", err) } - defer jornalFile.Close() + defer journalFile.Close() noOp := func(o int64, r journalRec) error { return nil } // First verify that the journal has data loss. var offset int64 - offset, err = processJournalRecords(context.Background(), jornalFile, 0, noOp, nil) + offset, err = processJournalRecords(context.Background(), journalFile, true /* tryTruncate */, 0, noOp, nil) if err == nil { // No data loss detected, nothing to do. return "", fmt.Errorf("no data loss detected in chunk journal file; no recovery performed") @@ -283,7 +283,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) } // Seek back to the start, to perform a full copy. - if _, err = jornalFile.Seek(0, io.SeekStart); err != nil { + if _, err = journalFile.Seek(0, io.SeekStart); err != nil { return "", fmt.Errorf("could not seek to start of chunk journal file: %w", err) } @@ -296,7 +296,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) if err != nil { return "", fmt.Errorf("could not create backup of corrupted chunk journal file: %w", err) } - if _, err = io.Copy(saveFile, jornalFile); err != nil { + if _, err = io.Copy(saveFile, journalFile); err != nil { return "", fmt.Errorf("could not backup corrupted chunk journal file: %w", err) } @@ -305,10 +305,10 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) } // Now truncate the journal file to the last known good offset. - if err = jornalFile.Truncate(offset); err != nil { + if err = journalFile.Truncate(offset); err != nil { return "", fmt.Errorf("could not truncate corrupted chunk journal file: %w", err) } - if err = jornalFile.Sync(); err != nil { + if err = journalFile.Sync(); err != nil { return "", fmt.Errorf("could not sync truncated chunk journal file: %w", err) } @@ -327,7 +327,7 @@ func ReviveJournalWithDataLoss(nomsDir string) (preservePath string, err error) // // The |warningsCb| callback is called with any errors encountered that we automatically recover from. This allows the caller // to handle the situation in a context specific way. -func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { +func processJournalRecords(ctx context.Context, r io.ReadSeeker, tryTruncate bool, off int64, cb func(o int64, r journalRec) error, warningsCb func(error)) (int64, error) { var ( buf []byte err error @@ -444,7 +444,7 @@ func processJournalRecords(ctx context.Context, r io.ReadSeeker, off int64, cb f // When we have a real file, we truncate anything which is beyond the current offset. Historically we put // null bytes there, and there have been cases of garbage data being present instead of nulls. If there is any // data beyond the current offset which we can parse and looks like data loss, we would have errored out above. - if f, ok := r.(*os.File); ok { + if f, ok := r.(*os.File); ok && tryTruncate { err = f.Truncate(off) if err != nil { return 0, err diff --git a/go/store/nbs/journal_record_test.go b/go/store/nbs/journal_record_test.go index 5ca12007e7e..84034620e61 100644 --- a/go/store/nbs/journal_record_test.go +++ b/go/store/nbs/journal_record_test.go @@ -116,7 +116,7 @@ func TestProcessJournalRecords(t *testing.T) { } var recoverErr error - n, err := processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e }) + n, err := processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) require.NoError(t, err) @@ -125,7 +125,7 @@ func TestProcessJournalRecords(t *testing.T) { // write a bogus record to the end and verify that we don't get an error i, sum = 0, 0 writeCorruptJournalRecord(journal[off:]) - n, err = processJournalRecords(ctx, bytes.NewReader(journal), 0, check, func(e error) { recoverErr = e }) + n, err = processJournalRecords(ctx, bytes.NewReader(journal), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) assert.Equal(t, cnt, i) assert.Equal(t, int(off), int(n)) @@ -227,7 +227,7 @@ func TestJournalForDataLoss(t *testing.T) { } var recoverErr error - _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, bytes.NewReader(journal[:off]), true, 0, check, func(e error) { recoverErr = e }) if td.lossExpected { require.Error(t, err) @@ -288,7 +288,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // no confidence in the rest of the test. ctx := context.Background() var recoverErr error - bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + bytesRead, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.NoError(t, err) require.Equal(t, off, uint32(bytesRead)) require.Error(t, recoverErr) // We do expect a warning here, but no data loss. @@ -312,7 +312,7 @@ func TestJournalForDataLossOnBoundary(t *testing.T) { // Copy lost data into journal buffer at the test offset. copy(journalBuf[startPoint:startPoint+uint32(len(lostData))], lostData) - _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), 0, check, func(e error) { recoverErr = e }) + _, err := processJournalRecords(ctx, bytes.NewReader(journalBuf[:]), true, 0, check, func(e error) { recoverErr = e }) require.Error(t, err) require.True(t, errors.Is(err, ErrJournalDataLoss)) require.Error(t, recoverErr) @@ -457,7 +457,7 @@ func processJournalAndCollectRecords(t *testing.T, journalData []byte) []testRec t.FailNow() } - _, err := processJournalRecords(ctx, bytes.NewReader(journalData), 0, func(offset int64, rec journalRec) error { + _, err := processJournalRecords(ctx, bytes.NewReader(journalData), true, 0, func(offset int64, rec journalRec) error { records = append(records, testRecord{hash: rec.address, kind: rec.kind}) return nil }, warnCb) diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index a462d54e4b7..f9b16824ee9 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -163,7 +163,7 @@ var _ io.Closer = &journalWriter{} // are added to the novel ranges map. If the number of novel lookups exceeds |wr.maxNovel|, we // extend the journal index with one metadata flush before existing this function to save indexing // progress. -func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) { +func (wr *journalWriter) bootstrapJournal(ctx context.Context, canWrite bool, reflogRingBuffer *reflogRingBuffer, warningsCb func(error)) (last hash.Hash, err error) { wr.lock.Lock() defer wr.lock.Unlock() @@ -276,7 +276,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer // process the non-indexed portion of the journal starting at |wr.indexed|, // at minimum the non-indexed portion will include a root hash record. // Index lookups are added to the ongoing batch to re-synchronize. - wr.off, err = processJournalRecords(ctx, wr.journal, wr.indexed, func(o int64, r journalRec) error { + wr.off, err = processJournalRecords(ctx, wr.journal, canWrite, wr.indexed, func(o int64, r journalRec) error { switch r.kind { case chunkJournalRecKind: rng := Range{ diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index 27a540e45cf..cea880820f7 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -185,7 +185,7 @@ func newTestJournalWriter(t *testing.T, path string) *journalWriter { j, err := createJournalWriter(ctx, path) require.NoError(t, err) require.NotNil(t, j) - _, err = j.bootstrapJournal(ctx, nil, nil) + _, err = j.bootstrapJournal(ctx, true, nil, nil) require.NoError(t, err) return j } @@ -220,7 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) { j, _, err := openJournalWriter(ctx, path) require.NoError(t, err) reflogBuffer := newReflogRingBuffer(10) - last, err = j.bootstrapJournal(ctx, reflogBuffer, nil) + last, err = j.bootstrapJournal(ctx, true, reflogBuffer, nil) require.NoError(t, err) assertExpectedIterationOrder(t, reflogBuffer, []string{last.String()}) @@ -363,7 +363,7 @@ func TestJournalIndexBootstrap(t *testing.T) { require.NoError(t, err) require.True(t, ok) // bootstrap journal and validate chunk records - last, err := journal.bootstrapJournal(ctx, nil, nil) + last, err := journal.bootstrapJournal(ctx, true, nil, nil) assert.NoError(t, err) for _, e := range expected { var act CompressedChunk @@ -398,7 +398,7 @@ func TestJournalIndexBootstrap(t *testing.T) { jnl, ok, err := openJournalWriter(ctx, idxPath) require.NoError(t, err) require.True(t, ok) - _, err = jnl.bootstrapJournal(ctx, nil, nil) + _, err = jnl.bootstrapJournal(ctx, true, nil, nil) assert.Error(t, err) }) } diff --git a/integration-tests/go-sql-server-driver/repro_10331_test.go b/integration-tests/go-sql-server-driver/repro_10331_test.go new file mode 100644 index 00000000000..516095c96a9 --- /dev/null +++ b/integration-tests/go-sql-server-driver/repro_10331_test.go @@ -0,0 +1,173 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + // "context" + // "database/sql" + // sqldriver "database/sql/driver" + // "fmt" + // "strings" + "crypto/rand" + "encoding/base64" + "fmt" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + // "time" + + // "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + // "golang.org/x/sync/errgroup" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +// TestRegression10331 tests for the checksum error caused by concurrent executions of `dolt sql` +// while dolt sql-server is processing writes. +// +// https://github.com/dolthub/dolt/issues/10331 +func TestRegression10331(t *testing.T) { + t.Parallel() + // This test is not 100% reliable at detecting errors quickly, so we run it a few times. + const testCount = 4 + for i := range testCount { + t.Run(strconv.Itoa(i), func(t *testing.T) { + var ports DynamicResources + ports.global = &GlobalPorts + ports.t = t + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("regression_10331_test") + require.NoError(t, err) + + srvSettings := &driver.Server{ + Args: []string{"-P", `{{get_port "server_port"}}`}, + DynamicPort: "server_port", + } + server := MakeServer(t, repo, srvSettings, &ports) + server.DBName = "regression_10331_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + defer db.Close() + + ctx := t.Context() + + func() { + conn, err := db.Conn(ctx) + require.NoError(t, err) + defer conn.Close() + + // Create table and initial data. + _, err = conn.ExecContext(ctx, "CREATE TABLE data (id INT PRIMARY KEY AUTO_INCREMENT, val LONGTEXT)") + require.NoError(t, err) + for i := 0; i <= 50; i++ { + var bs [10240]byte + rand.Read(bs[:]) + data := base64.StdEncoding.EncodeToString(bs[:]) + _, err = conn.ExecContext(ctx, "INSERT INTO data (val) VALUES (?)", data) + require.NoError(t, err) + } + _, err = conn.ExecContext(ctx, "CALL DOLT_COMMIT('-Am', 'init with data')") + require.NoError(t, err) + }() + + eg, ctx := errgroup.WithContext(ctx) + start := time.Now() + + var successfulWrites, successfulReads int32 + var errWrites, errReads int32 + const numWriters = 8 + const numReaders = 16 + const testDuration = 8 * time.Second + for i := range numWriters { + eg.Go(func() error { + var bs [1024]byte + rand.Read(bs[:]) + data := base64.StdEncoding.EncodeToString(bs[:]) + j := 0 + for { + if time.Since(start) > testDuration { + return nil + } + if ctx.Err() != nil { + return nil + } + out, err := func() (string, error) { + cmd := repo.DoltCmd("sql", "-r", "csv", "-q", fmt.Sprintf("INSERT INTO data (val) VALUES ('%s'); CALL DOLT_COMMIT('--allow-empty', '-Am', 'w%d c%d')", data, i, j)) + out, err := cmd.CombinedOutput() + return string(out), err + }() + if err != nil { + t.Logf("error writing, %v", err) + atomic.AddInt32(&errWrites, 1) + if strings.Contains(out, "checksum") || strings.Contains(out, " EOF") { + return fmt.Errorf("error writing values %d: %s, %w", i, out, err) + } + } else { + atomic.AddInt32(&successfulWrites, 1) + } + j += 1 + } + }) + } + for i := range numReaders { + eg.Go(func() error { + j := 0 + for { + if time.Since(start) > testDuration { + return nil + } + if ctx.Err() != nil { + return nil + } + out, err := func() (string, error) { + cmd := repo.DoltCmd("sql", "-r", "csv", "-q", "SELECT COUNT(*) FROM data; SELECT * FROM dolt_log LIMIT 20") + out, err := cmd.CombinedOutput() + return string(out), err + }() + if err != nil { + t.Logf("error reading, %v", err) + atomic.AddInt32(&errReads, 1) + if strings.Contains(out, "checksum") || strings.Contains(out, " EOF") { + return fmt.Errorf("READER error %d: %s, %w", i, out, err) + } + } else { + atomic.AddInt32(&successfulReads, 1) + } + j += 1 + } + }) + } + + require.NoError(t, eg.Wait()) + t.Logf("err writes: %d, err reads: %d", errWrites, errReads) + t.Logf("successful writes: %d, successful reads: %d", successfulWrites, successfulReads) + }) + } +}