diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 5f22b5d844..9c74f341fa 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -1107,13 +1107,13 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account var catchpointWriter *catchpointWriter start := time.Now() ledgerGeneratecatchpointCount.Inc(nil) - err := ct.dbs.Rdb.Atomic(func(dbCtx context.Context, tx *sql.Tx) (err error) { - catchpointWriter, err = makeCatchpointWriter(ctx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk) + err := ct.dbs.Rdb.AtomicContext(ctx, func(dbCtx context.Context, tx *sql.Tx) (err error) { + catchpointWriter, err = makeCatchpointWriter(dbCtx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk) if err != nil { return } for more { - stepCtx, stepCancelFunction := context.WithTimeout(ctx, chunkExecutionDuration) + stepCtx, stepCancelFunction := context.WithTimeout(dbCtx, chunkExecutionDuration) writeStepStartTime := time.Now() more, err = catchpointWriter.WriteStep(stepCtx) // accumulate the actual time we've spent writing in this step. @@ -1135,7 +1135,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account if chunkExecutionDuration > longChunkExecutionDuration { chunkExecutionDuration = longChunkExecutionDuration } - case <-ctx.Done(): + case <-dbCtx.Done(): //retryCatchpointCreation = true err2 := catchpointWriter.Abort() if err2 != nil { diff --git a/util/db/dbutil.go b/util/db/dbutil.go index b4e734c430..34a32320a1 100644 --- a/util/db/dbutil.go +++ b/util/db/dbutil.go @@ -212,12 +212,13 @@ func (db *Accessor) IsSharedCacheConnection() bool { // The return error of fn should be a native sqlite3.Error type or an error wrapping it. // DO NOT return a custom error - the internal logic of Atomic expects an sqlite error and uses that value. func (db *Accessor) Atomic(fn idemFn, extras ...interface{}) (err error) { - return db.atomic(fn, extras...) + return db.AtomicContext(context.Background(), fn, extras...) } -// Atomic executes a piece of code with respect to the database atomically. +// AtomicContext executes a piece of code with respect to the database atomically. // For transactions where readOnly is false, sync determines whether or not to wait for the result. -func (db *Accessor) atomic(fn idemFn, extras ...interface{}) (err error) { +// Like for Atomic, the return error of fn should be a native sqlite3.Error type or an error wrapping it. +func (db *Accessor) AtomicContext(ctx context.Context, fn idemFn, extras ...interface{}) (err error) { atomicDeadline := time.Now().Add(time.Second) // note that the sql library will drop panics inside an active transaction @@ -244,7 +245,6 @@ func (db *Accessor) atomic(fn idemFn, extras ...interface{}) (err error) { var tx *sql.Tx var conn *sql.Conn - ctx := context.Background() for i := 0; (i == 0) || dbretry(err); i++ { if i > 0 {