diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index f9f40cae04e..d5b3a0cf061 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -386,10 +386,11 @@ func (tx *DoltTransaction) doCommit( if !ok { return nil, nil, fmt.Errorf("database %s unknown to transaction, this is a bug", dbName) } + normalizedDbName := strings.ToLower(branchState.dbState.dbName) // Load the start state for this working set from the noms root at tx start // Get the base DB name from the db state, not the branch state - startPoint, ok := tx.dbStartPoints[strings.ToLower(branchState.dbState.dbName)] + startPoint, ok := tx.dbStartPoints[normalizedDbName] if !ok { return nil, nil, fmt.Errorf("database %s unknown to transaction, this is a bug", dbName) } @@ -403,7 +404,7 @@ func (tx *DoltTransaction) doCommit( mergeOpts := branchState.EditOpts() - lockID := dbName + "\u0000" + workingSet.Ref().String() + lockID := normalizedDbName + "\u0000" + workingSet.Ref().String() for i := 0; i < maxTxCommitRetries; i++ { updatedWs, newCommit, err := func() (*doltdb.WorkingSet, *doltdb.Commit, error) { @@ -501,7 +502,6 @@ func (tx *DoltTransaction) mergeRoots( workingSet *doltdb.WorkingSet, mergeOpts editor.Options, ) (*doltdb.WorkingSet, error) { - tableResolver, err := GetTableResolver(ctx, dbName) if err != nil { return nil, err diff --git a/integration-tests/go-sql-server-driver/concurrent_writes_test.go b/integration-tests/go-sql-server-driver/concurrent_writes_test.go new file mode 100644 index 00000000000..da6c203bfb2 --- /dev/null +++ b/integration-tests/go-sql-server-driver/concurrent_writes_test.go @@ -0,0 +1,134 @@ +// Copyright 2026 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +// TestConcurrentWrites verifies concurrent write behavior and transaction locking in the SQL server driver. +func TestConcurrentWrites(t *testing.T) { + t.Parallel() + var ports DynamicResources + ports.global = &GlobalPorts + ports.t = t + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("concurrent_writes_test") + require.NoError(t, err) + + srvSettings := &driver.Server{ + Args: []string{"-P", `{{get_port "server_port"}}`}, + DynamicPort: "server_port", + } + server := MakeServer(t, repo, srvSettings, &ports) + server.DBName = "concurrent_writes_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + db.SetMaxIdleConns(0) + defer func() { + require.NoError(t, db.Close()) + }() + ctx := t.Context() + func() { + conn, err := db.Conn(ctx) + require.NoError(t, err) + defer conn.Close() + // Create table and initial data. + _, err = conn.ExecContext(ctx, "CREATE TABLE data (id VARCHAR(64) PRIMARY KEY, worker INT, data TEXT, created_at TIMESTAMP)") + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "CALL DOLT_COMMIT('-Am', 'init with table')") + require.NoError(t, err) + }() + + eg, ctx := errgroup.WithContext(ctx) + start := time.Now() + + nextInt := uint32(0) + const numWriters = 32 + const testDuration = 8 * time.Second + startCh := make(chan struct{}) + for i := range numWriters { + eg.Go(func() error { + select { + case <-startCh: + case <-ctx.Done(): + return nil + } + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + defer db.Close() + db.SetMaxOpenConns(1) + conn, err := db.Conn(ctx) + if err != nil { + return err + } + defer conn.Close() + j := 0 + for { + if time.Since(start) > testDuration { + return nil + } + if ctx.Err() != nil { + return nil + } + key := fmt.Sprintf("main-%d-%d", i, j) + _, err := conn.ExecContext(ctx, "INSERT INTO data VALUES (?,?,?,?)", key, i, key, time.Now()) + if err != nil { + return err + } + atomic.AddUint32(&nextInt, 1) + _, err = conn.ExecContext(ctx, fmt.Sprintf("CALL DOLT_COMMIT('-Am', 'insert %s')", key)) + if err != nil { + return err + } + j += 1 + } + }) + } + time.Sleep(500 * time.Millisecond) + close(startCh) + require.NoError(t, eg.Wait()) + t.Logf("wrote %d", nextInt) + ctx = t.Context() + conn, err := db.Conn(ctx) + require.NoError(t, err) + defer func () { + require.NoError(t, conn.Close()) + }() + var i int + err = conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM data").Scan(&i) + require.NoError(t, err) + t.Logf("read %d", i) + err = conn.QueryRowContext(ctx, "SELECT COUNT(*) FROM dolt_log").Scan(&i) + require.NoError(t, err) + t.Logf("created %d commits", i) +} diff --git a/integration-tests/go-sql-server-driver/repro_10331_test.go b/integration-tests/go-sql-server-driver/repro_10331_test.go index 516095c96a9..04809822ce3 100644 --- a/integration-tests/go-sql-server-driver/repro_10331_test.go +++ b/integration-tests/go-sql-server-driver/repro_10331_test.go @@ -15,11 +15,6 @@ package main import ( - // "context" - // "database/sql" - // sqldriver "database/sql/driver" - // "fmt" - // "strings" "crypto/rand" "encoding/base64" "fmt" @@ -29,14 +24,9 @@ import ( "testing" "time" - // "time" - - // "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" - // "golang.org/x/sync/errgroup" - driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" )