Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,13 +1107,13 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
var catchpointWriter *catchpointWriter
start := time.Now()
ledgerGeneratecatchpointCount.Inc(nil)
err := ct.dbs.Rdb.Atomic(func(dbCtx context.Context, tx *sql.Tx) (err error) {
catchpointWriter, err = makeCatchpointWriter(ctx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk)
err := ct.dbs.Rdb.AtomicContext(ctx, func(dbCtx context.Context, tx *sql.Tx) (err error) {
catchpointWriter, err = makeCatchpointWriter(dbCtx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk)
if err != nil {
return
}
for more {
stepCtx, stepCancelFunction := context.WithTimeout(ctx, chunkExecutionDuration)
stepCtx, stepCancelFunction := context.WithTimeout(dbCtx, chunkExecutionDuration)
writeStepStartTime := time.Now()
more, err = catchpointWriter.WriteStep(stepCtx)
// accumulate the actual time we've spent writing in this step.
Expand All @@ -1135,7 +1135,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
if chunkExecutionDuration > longChunkExecutionDuration {
chunkExecutionDuration = longChunkExecutionDuration
}
case <-ctx.Done():
case <-dbCtx.Done():
//retryCatchpointCreation = true
err2 := catchpointWriter.Abort()
if err2 != nil {
Expand Down
8 changes: 4 additions & 4 deletions util/db/dbutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,13 @@ func (db *Accessor) IsSharedCacheConnection() bool {
// The return error of fn should be a native sqlite3.Error type or an error wrapping it.
// DO NOT return a custom error - the internal logic of Atomic expects an sqlite error and uses that value.
func (db *Accessor) Atomic(fn idemFn, extras ...interface{}) (err error) {
return db.atomic(fn, extras...)
return db.AtomicContext(context.Background(), fn, extras...)
}

// Atomic executes a piece of code with respect to the database atomically.
// AtomicContext executes a piece of code with respect to the database atomically.
// For transactions where readOnly is false, sync determines whether or not to wait for the result.
func (db *Accessor) atomic(fn idemFn, extras ...interface{}) (err error) {
// Like for Atomic, the return error of fn should be a native sqlite3.Error type or an error wrapping it.
func (db *Accessor) AtomicContext(ctx context.Context, fn idemFn, extras ...interface{}) (err error) {
atomicDeadline := time.Now().Add(time.Second)

// note that the sql library will drop panics inside an active transaction
Expand All @@ -244,7 +245,6 @@ func (db *Accessor) atomic(fn idemFn, extras ...interface{}) (err error) {

var tx *sql.Tx
var conn *sql.Conn
ctx := context.Background()

for i := 0; (i == 0) || dbretry(err); i++ {
if i > 0 {
Expand Down