From b060139d135f53251be312db916a4fab23dd358f Mon Sep 17 00:00:00 2001 From: Shoham Chakraborty Date: Mon, 26 Jan 2026 13:58:19 +0800 Subject: [PATCH] fix race --- execution/stagedsync/exec3.go | 84 +++++++++++++------------- execution/stagedsync/exec3_metrics.go | 30 ++++----- execution/stagedsync/exec3_parallel.go | 38 ++++++------ execution/stagedsync/exec3_serial.go | 10 +-- 4 files changed, 80 insertions(+), 82 deletions(-) diff --git a/execution/stagedsync/exec3.go b/execution/stagedsync/exec3.go index cfa7f96aff2..9b5b0bf9367 100644 --- a/execution/stagedsync/exec3.go +++ b/execution/stagedsync/exec3.go @@ -232,24 +232,24 @@ func ExecV3(ctx context.Context, if parallel { pe := ¶llelExecutor{ txExecutor: txExecutor{ - cfg: cfg, - rs: rs, - doms: doms, - agg: agg, - isBlockProduction: isBlockProduction, - isForkValidation: isForkValidation, - isApplyingBlocks: isApplyingBlocks, - logger: logger, - logPrefix: execStage.LogPrefix(), - progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger), - enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle, - hooks: hooks, - lastCommittedTxNum: doms.TxNum(), - lastCommittedBlockNum: blockNum, - postValidator: postValidator, + cfg: cfg, + rs: rs, + doms: doms, + agg: agg, + isBlockProduction: isBlockProduction, + isForkValidation: isForkValidation, + isApplyingBlocks: isApplyingBlocks, + logger: logger, + logPrefix: execStage.LogPrefix(), + progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger), + enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle, + hooks: hooks, + postValidator: postValidator, }, workerCount: cfg.syncCfg.ExecWorkerCount, } + pe.lastCommittedTxNum.Store(doms.TxNum()) + pe.lastCommittedBlockNum.Store(blockNum) defer func() { pe.LogComplete(stepsInDb) @@ -258,29 +258,29 @@ func ExecV3(ctx context.Context, lastHeader, applyTx, execErr = pe.exec(ctx, execStage, u, startBlockNum, offsetFromBlockBeginning, maxBlockNum, blockLimit, initialTxNum, inputTxNum, initialCycle, applyTx, accumulator, readAhead, logEvery) - lastCommittedBlockNum = pe.lastCommittedBlockNum - lastCommittedTxNum = pe.lastCommittedTxNum + lastCommittedBlockNum = pe.lastCommittedBlockNum.Load() + lastCommittedTxNum = pe.lastCommittedTxNum.Load() } else { se := &serialExecutor{ txExecutor: txExecutor{ - cfg: cfg, - rs: rs, - doms: doms, - agg: agg, - u: u, - isBlockProduction: isBlockProduction, - isForkValidation: isForkValidation, - isApplyingBlocks: isApplyingBlocks, - applyTx: applyTx, - logger: logger, - logPrefix: execStage.LogPrefix(), - progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger), - enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle, - hooks: hooks, - lastCommittedTxNum: doms.TxNum(), - lastCommittedBlockNum: blockNum, - postValidator: postValidator, + cfg: cfg, + rs: rs, + doms: doms, + agg: agg, + u: u, + isBlockProduction: isBlockProduction, + isForkValidation: isForkValidation, + isApplyingBlocks: isApplyingBlocks, + applyTx: applyTx, + logger: logger, + logPrefix: execStage.LogPrefix(), + progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger), + enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle, + hooks: hooks, + postValidator: postValidator, }} + se.lastCommittedTxNum.Store(doms.TxNum()) + se.lastCommittedBlockNum.Store(blockNum) defer func() { se.LogComplete(stepsInDb) @@ -302,9 +302,9 @@ func ExecV3(ctx context.Context, return err } - se.lastCommittedBlockNum = lastHeader.Number.Uint64() - committedTransactions := se.domains().TxNum() - se.lastCommittedTxNum - se.lastCommittedTxNum = se.domains().TxNum() + se.lastCommittedBlockNum.Store(lastHeader.Number.Uint64()) + committedTransactions := se.domains().TxNum() - se.lastCommittedTxNum.Load() + se.lastCommittedTxNum.Store(se.domains().TxNum()) commitStart := time.Now() stepsInDb = rawdbhelpers.IdxStepsCountV3(applyTx, applyTx.Debug().StepSize()) @@ -334,8 +334,8 @@ func ExecV3(ctx context.Context, } } - lastCommittedBlockNum = se.lastCommittedBlockNum - lastCommittedTxNum = se.lastCommittedTxNum + lastCommittedBlockNum = se.lastCommittedBlockNum.Load() + lastCommittedTxNum = se.lastCommittedTxNum.Load() } if false && !isForkValidation { @@ -426,9 +426,9 @@ type txExecutor struct { lastExecutedBlockNum atomic.Int64 lastExecutedTxNum atomic.Int64 executedGas atomic.Int64 - lastCommittedBlockNum uint64 - lastCommittedTxNum uint64 - committedGas int64 + lastCommittedBlockNum atomic.Uint64 + lastCommittedTxNum atomic.Uint64 + committedGas atomic.Int64 execLoopGroup *errgroup.Group diff --git a/execution/stagedsync/exec3_metrics.go b/execution/stagedsync/exec3_metrics.go index fa06c165a61..18f972ea169 100644 --- a/execution/stagedsync/exec3_metrics.go +++ b/execution/stagedsync/exec3_metrics.go @@ -591,7 +591,7 @@ func (p *Progress) LogExecution(rs *state.StateV3, ex executor) { curTaskGasPerSec := int64(float64(curTaskGas) / seconds) - uncommitedGas := uint64(te.executedGas.Load() - te.committedGas) + uncommitedGas := uint64(te.executedGas.Load() - te.committedGas.Load()) sizeEstimate := rs.SizeEstimate() switch ex.(type) { @@ -745,12 +745,12 @@ func (p *Progress) LogCommitments(rs *state.StateV3, ex executor, commitStart ti currentTime := time.Now() interval := currentTime.Sub(p.prevCommitTime) - committedGasSec := uint64(float64(te.committedGas-p.prevCommittedGas) / interval.Seconds()) + committedGasSec := uint64(float64(te.committedGas.Load()-p.prevCommittedGas) / interval.Seconds()) var committedTxSec uint64 - if te.lastCommittedTxNum > p.prevCommittedTxNum { - committedTxSec = uint64(float64(te.lastCommittedTxNum-p.prevCommittedTxNum) / interval.Seconds()) + if te.lastCommittedTxNum.Load() > p.prevCommittedTxNum { + committedTxSec = uint64(float64(te.lastCommittedTxNum.Load()-p.prevCommittedTxNum) / interval.Seconds()) } - committedDiffBlocks := max(int64(te.lastCommittedBlockNum)-int64(p.prevCommittedBlockNum), 0) + committedDiffBlocks := max(int64(te.lastCommittedBlockNum.Load())-int64(p.prevCommittedBlockNum), 0) var commitedBlockDur time.Duration @@ -801,17 +801,17 @@ func (p *Progress) LogCommitments(rs *state.StateV3, ex executor, commitStart ti "buf", common.ByteCount(uint64(rs.Domains().Metrics().CachePutSize + rs.Domains().Metrics().CacheGetSize)), } - p.log("committed", suffix, te, rs, interval, te.lastCommittedBlockNum, committedDiffBlocks, - te.lastCommittedTxNum-p.prevCommittedTxNum, committedTxSec, committedGasSec, 0, stepsInDb, commitVals) + p.log("committed", suffix, te, rs, interval, te.lastCommittedBlockNum.Load(), committedDiffBlocks, + te.lastCommittedTxNum.Load()-p.prevCommittedTxNum, committedTxSec, committedGasSec, 0, stepsInDb, commitVals) p.prevDomainMetrics = updateExecDomainMetrics(te.doms.Metrics(), p.prevDomainMetrics, interval, false) p.prevCommitTime = currentTime - if te.lastCommittedTxNum > 0 { - p.prevCommittedTxNum = te.lastCommittedTxNum - p.prevCommittedGas = te.committedGas - p.prevCommittedBlockNum = te.lastCommittedBlockNum + if te.lastCommittedTxNum.Load() > 0 { + p.prevCommittedTxNum = te.lastCommittedTxNum.Load() + p.prevCommittedGas = te.committedGas.Load() + p.prevCommittedBlockNum = te.lastCommittedBlockNum.Load() } } @@ -830,19 +830,19 @@ func (p *Progress) LogComplete(rs *state.StateV3, ex executor, stepsInDb float64 suffix = " serial" } - gas := te.committedGas + gas := te.committedGas.Load() if gas == 0 { gas = te.executedGas.Load() } - lastTxNum := te.lastCommittedTxNum + lastTxNum := te.lastCommittedTxNum.Load() if lastTxNum == 0 { lastTxNum = uint64(te.lastExecutedTxNum.Load()) } - lastBlockNum := te.lastCommittedBlockNum + lastBlockNum := te.lastCommittedBlockNum.Load() if lastBlockNum == 0 { lastBlockNum = uint64(te.lastExecutedBlockNum.Load()) @@ -891,7 +891,7 @@ func (p *Progress) log(mode string, suffix string, te *txExecutor, rs *state.Sta if stepsInDb > 0 { vals = append(vals, []any{ "stepsInDB", fmt.Sprintf("%.2f", stepsInDb), - "step", fmt.Sprintf("%.1f", float64(te.lastCommittedTxNum)/float64(te.agg.StepSize())), + "step", fmt.Sprintf("%.1f", float64(te.lastCommittedTxNum.Load())/float64(te.agg.StepSize())), }...) } diff --git a/execution/stagedsync/exec3_parallel.go b/execution/stagedsync/exec3_parallel.go index 25a45594ffc..978513281fb 100644 --- a/execution/stagedsync/exec3_parallel.go +++ b/execution/stagedsync/exec3_parallel.go @@ -142,7 +142,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U var uncommittedGas int64 var flushPending bool var hasLoggedExecution bool - var hasLoggedCommittments bool + var hasLoggedCommittments atomic.Bool var commitStart time.Time var stepsInDb = rawdbhelpers.IdxStepsCountV3(rwTx, pe.agg.StepSize()) @@ -263,7 +263,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U if !dbg.BatchCommitments || shouldGenerateChangesets || lastBlockResult.BlockNum == maxBlockNum || applyResult.Exhausted != nil || pe.cfg.syncCfg.KeepExecutionProofs || - (flushPending && lastBlockResult.BlockNum > pe.lastCommittedBlockNum) { + (flushPending && lastBlockResult.BlockNum > pe.lastCommittedBlockNum.Load()) { resetExecGauges(ctx) @@ -301,7 +301,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U commitedBlocks := uint64(math.Round(float64(uncommittedBlocks) * progress)) if commitedBlocks > prevCommitedBlocks { - hasLoggedCommittments = true + hasLoggedCommittments.Store(true) pe.LogCommitments(commitStart, commitedBlocks-prevCommitedBlocks, committedTransactions-prevCommittedTransactions, @@ -325,8 +325,8 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U return case progress, ok := <-commitProgress: if !ok { - if !hasLoggedCommittments || time.Since(lastCommitedLog) > logInterval/20 { - hasLoggedCommittments = true + if !hasLoggedCommittments.Load() || time.Since(lastCommitedLog) > logInterval/20 { + hasLoggedCommittments.Store(true) pe.LogCommitments(commitStart, uint64(uncommittedBlocks), uncommittedTransactions, uint64(uncommittedGas), stepsInDb, lastProgress) @@ -386,8 +386,8 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U <-LogCommitmentsDone // make sure no async mutations by LogCommitments can happen at this point // fix these here - they will contain estimates after commit logging - pe.txExecutor.lastCommittedBlockNum = lastBlockResult.BlockNum - pe.txExecutor.lastCommittedTxNum = lastBlockResult.lastTxNum + pe.txExecutor.lastCommittedBlockNum.Store(lastBlockResult.BlockNum) + pe.txExecutor.lastCommittedTxNum.Store(lastBlockResult.lastTxNum) uncommittedBlocks = 0 uncommittedGas = 0 uncommittedTransactions = 0 @@ -440,8 +440,13 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U pe.LogExecution() } - if !hasLoggedCommittments && !commitStart.IsZero() { - pe.LogCommitments(commitStart, pe.txExecutor.lastCommittedBlockNum, uncommittedTransactions, uint64(uncommittedGas), stepsInDb, lastProgress) + // Wait for all goroutines to complete before reading shared state + if err := pe.wait(ctx); err != nil { + return nil, rwTx, err + } + + if !hasLoggedCommittments.Load() && !commitStart.IsZero() { + pe.LogCommitments(commitStart, pe.txExecutor.lastCommittedBlockNum.Load(), uncommittedTransactions, uint64(uncommittedGas), stepsInDb, lastProgress) } if execErr != nil { @@ -450,17 +455,10 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U } } - if err := pe.wait(ctx); err != nil { + if err = execStage.Update(rwTx, pe.lastCommittedBlockNum.Load()); err != nil { return nil, rwTx, err } - if err = execStage.Update(rwTx, pe.lastCommittedBlockNum); err != nil { - return nil, rwTx, err - } - - if err = pe.wait(ctx); err != nil { - return nil, rwTx, fmt.Errorf("wait failed: %w", err) - } return lastHeader, rwTx, execErr } @@ -475,9 +473,9 @@ func (pe *parallelExecutor) LogExecution() { } func (pe *parallelExecutor) LogCommitments(commitStart time.Time, committedBlocks uint64, committedTransactions uint64, committedGas uint64, stepsInDb float64, lastProgress commitment.CommitProgress) { - pe.committedGas += int64(committedGas) - pe.txExecutor.lastCommittedBlockNum += committedBlocks - pe.txExecutor.lastCommittedTxNum += committedTransactions + pe.committedGas.Add(int64(committedGas)) + pe.txExecutor.lastCommittedBlockNum.Add(committedBlocks) + pe.txExecutor.lastCommittedTxNum.Add(committedTransactions) pe.progress.LogCommitments(pe.rs.StateV3, pe, commitStart, stepsInDb, lastProgress) if domainMetrics := pe.domains().LogMetrics(); len(domainMetrics) > 0 { pe.logger.Info(fmt.Sprintf("[%s] domain reads", pe.logPrefix), domainMetrics...) diff --git a/execution/stagedsync/exec3_serial.go b/execution/stagedsync/exec3_serial.go index d040021af58..76d307084e9 100644 --- a/execution/stagedsync/exec3_serial.go +++ b/execution/stagedsync/exec3_serial.go @@ -198,8 +198,8 @@ func (se *serialExecutor) exec(ctx context.Context, execStage *StageState, u Unw return b.HeaderNoCopy(), rwTx, nil } resetCommitmentGauges(ctx) - se.txExecutor.lastCommittedBlockNum = b.NumberU64() - se.txExecutor.lastCommittedTxNum = inputTxNum + se.txExecutor.lastCommittedBlockNum.Store(b.NumberU64()) + se.txExecutor.lastCommittedTxNum.Store(inputTxNum) se.logger.Info( "periodic commit check", "block", se.doms.BlockNum(), @@ -238,9 +238,9 @@ func (se *serialExecutor) LogExecution() { } func (se *serialExecutor) LogCommitments(commitStart time.Time, committedBlocks uint64, committedTransactions uint64, committedGas uint64, stepsInDb float64, lastProgress commitment.CommitProgress) { - se.committedGas += int64(committedGas) - se.txExecutor.lastCommittedBlockNum += committedBlocks - se.txExecutor.lastCommittedTxNum += committedTransactions + se.committedGas.Add(int64(committedGas)) + se.txExecutor.lastCommittedBlockNum.Add(committedBlocks) + se.txExecutor.lastCommittedTxNum.Add(committedTransactions) se.progress.LogCommitments(se.rs.StateV3, se, commitStart, stepsInDb, lastProgress) }