Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 42 additions & 42 deletions execution/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,24 +232,24 @@ func ExecV3(ctx context.Context,
if parallel {
pe := &parallelExecutor{
txExecutor: txExecutor{
cfg: cfg,
rs: rs,
doms: doms,
agg: agg,
isBlockProduction: isBlockProduction,
isForkValidation: isForkValidation,
isApplyingBlocks: isApplyingBlocks,
logger: logger,
logPrefix: execStage.LogPrefix(),
progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger),
enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle,
hooks: hooks,
lastCommittedTxNum: doms.TxNum(),
lastCommittedBlockNum: blockNum,
postValidator: postValidator,
cfg: cfg,
rs: rs,
doms: doms,
agg: agg,
isBlockProduction: isBlockProduction,
isForkValidation: isForkValidation,
isApplyingBlocks: isApplyingBlocks,
logger: logger,
logPrefix: execStage.LogPrefix(),
progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger),
enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle,
hooks: hooks,
postValidator: postValidator,
},
workerCount: cfg.syncCfg.ExecWorkerCount,
}
pe.lastCommittedTxNum.Store(doms.TxNum())
pe.lastCommittedBlockNum.Store(blockNum)

defer func() {
pe.LogComplete(stepsInDb)
Expand All @@ -258,29 +258,29 @@ func ExecV3(ctx context.Context,
lastHeader, applyTx, execErr = pe.exec(ctx, execStage, u, startBlockNum, offsetFromBlockBeginning, maxBlockNum, blockLimit,
initialTxNum, inputTxNum, initialCycle, applyTx, accumulator, readAhead, logEvery)

lastCommittedBlockNum = pe.lastCommittedBlockNum
lastCommittedTxNum = pe.lastCommittedTxNum
lastCommittedBlockNum = pe.lastCommittedBlockNum.Load()
lastCommittedTxNum = pe.lastCommittedTxNum.Load()
} else {
se := &serialExecutor{
txExecutor: txExecutor{
cfg: cfg,
rs: rs,
doms: doms,
agg: agg,
u: u,
isBlockProduction: isBlockProduction,
isForkValidation: isForkValidation,
isApplyingBlocks: isApplyingBlocks,
applyTx: applyTx,
logger: logger,
logPrefix: execStage.LogPrefix(),
progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger),
enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle,
hooks: hooks,
lastCommittedTxNum: doms.TxNum(),
lastCommittedBlockNum: blockNum,
postValidator: postValidator,
cfg: cfg,
rs: rs,
doms: doms,
agg: agg,
u: u,
isBlockProduction: isBlockProduction,
isForkValidation: isForkValidation,
isApplyingBlocks: isApplyingBlocks,
applyTx: applyTx,
logger: logger,
logPrefix: execStage.LogPrefix(),
progress: NewProgress(blockNum, inputTxNum, commitThreshold, false, execStage.LogPrefix(), logger),
enableChaosMonkey: execStage.CurrentSyncCycle.IsInitialCycle,
hooks: hooks,
postValidator: postValidator,
}}
se.lastCommittedTxNum.Store(doms.TxNum())
se.lastCommittedBlockNum.Store(blockNum)

defer func() {
se.LogComplete(stepsInDb)
Expand All @@ -302,9 +302,9 @@ func ExecV3(ctx context.Context,
return err
}

se.lastCommittedBlockNum = lastHeader.Number.Uint64()
committedTransactions := se.domains().TxNum() - se.lastCommittedTxNum
se.lastCommittedTxNum = se.domains().TxNum()
se.lastCommittedBlockNum.Store(lastHeader.Number.Uint64())
committedTransactions := se.domains().TxNum() - se.lastCommittedTxNum.Load()
se.lastCommittedTxNum.Store(se.domains().TxNum())

commitStart := time.Now()
stepsInDb = rawdbhelpers.IdxStepsCountV3(applyTx, applyTx.Debug().StepSize())
Expand Down Expand Up @@ -334,8 +334,8 @@ func ExecV3(ctx context.Context,
}
}

lastCommittedBlockNum = se.lastCommittedBlockNum
lastCommittedTxNum = se.lastCommittedTxNum
lastCommittedBlockNum = se.lastCommittedBlockNum.Load()
lastCommittedTxNum = se.lastCommittedTxNum.Load()
}

if false && !isForkValidation {
Expand Down Expand Up @@ -426,9 +426,9 @@ type txExecutor struct {
lastExecutedBlockNum atomic.Int64
lastExecutedTxNum atomic.Int64
executedGas atomic.Int64
lastCommittedBlockNum uint64
lastCommittedTxNum uint64
committedGas int64
lastCommittedBlockNum atomic.Uint64
lastCommittedTxNum atomic.Uint64
committedGas atomic.Int64

execLoopGroup *errgroup.Group

Expand Down
30 changes: 15 additions & 15 deletions execution/stagedsync/exec3_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (p *Progress) LogExecution(rs *state.StateV3, ex executor) {

curTaskGasPerSec := int64(float64(curTaskGas) / seconds)

uncommitedGas := uint64(te.executedGas.Load() - te.committedGas)
uncommitedGas := uint64(te.executedGas.Load() - te.committedGas.Load())
sizeEstimate := rs.SizeEstimate()

switch ex.(type) {
Expand Down Expand Up @@ -745,12 +745,12 @@ func (p *Progress) LogCommitments(rs *state.StateV3, ex executor, commitStart ti
currentTime := time.Now()
interval := currentTime.Sub(p.prevCommitTime)

committedGasSec := uint64(float64(te.committedGas-p.prevCommittedGas) / interval.Seconds())
committedGasSec := uint64(float64(te.committedGas.Load()-p.prevCommittedGas) / interval.Seconds())
var committedTxSec uint64
if te.lastCommittedTxNum > p.prevCommittedTxNum {
committedTxSec = uint64(float64(te.lastCommittedTxNum-p.prevCommittedTxNum) / interval.Seconds())
if te.lastCommittedTxNum.Load() > p.prevCommittedTxNum {
committedTxSec = uint64(float64(te.lastCommittedTxNum.Load()-p.prevCommittedTxNum) / interval.Seconds())
}
committedDiffBlocks := max(int64(te.lastCommittedBlockNum)-int64(p.prevCommittedBlockNum), 0)
committedDiffBlocks := max(int64(te.lastCommittedBlockNum.Load())-int64(p.prevCommittedBlockNum), 0)

var commitedBlockDur time.Duration

Expand Down Expand Up @@ -801,17 +801,17 @@ func (p *Progress) LogCommitments(rs *state.StateV3, ex executor, commitStart ti
"buf", common.ByteCount(uint64(rs.Domains().Metrics().CachePutSize + rs.Domains().Metrics().CacheGetSize)),
}

p.log("committed", suffix, te, rs, interval, te.lastCommittedBlockNum, committedDiffBlocks,
te.lastCommittedTxNum-p.prevCommittedTxNum, committedTxSec, committedGasSec, 0, stepsInDb, commitVals)
p.log("committed", suffix, te, rs, interval, te.lastCommittedBlockNum.Load(), committedDiffBlocks,
te.lastCommittedTxNum.Load()-p.prevCommittedTxNum, committedTxSec, committedGasSec, 0, stepsInDb, commitVals)

p.prevDomainMetrics = updateExecDomainMetrics(te.doms.Metrics(), p.prevDomainMetrics, interval, false)

p.prevCommitTime = currentTime

if te.lastCommittedTxNum > 0 {
p.prevCommittedTxNum = te.lastCommittedTxNum
p.prevCommittedGas = te.committedGas
p.prevCommittedBlockNum = te.lastCommittedBlockNum
if te.lastCommittedTxNum.Load() > 0 {
p.prevCommittedTxNum = te.lastCommittedTxNum.Load()
p.prevCommittedGas = te.committedGas.Load()
p.prevCommittedBlockNum = te.lastCommittedBlockNum.Load()
}
}

Expand All @@ -830,19 +830,19 @@ func (p *Progress) LogComplete(rs *state.StateV3, ex executor, stepsInDb float64
suffix = " serial"
}

gas := te.committedGas
gas := te.committedGas.Load()

if gas == 0 {
gas = te.executedGas.Load()
}

lastTxNum := te.lastCommittedTxNum
lastTxNum := te.lastCommittedTxNum.Load()

if lastTxNum == 0 {
lastTxNum = uint64(te.lastExecutedTxNum.Load())
}

lastBlockNum := te.lastCommittedBlockNum
lastBlockNum := te.lastCommittedBlockNum.Load()

if lastBlockNum == 0 {
lastBlockNum = uint64(te.lastExecutedBlockNum.Load())
Expand Down Expand Up @@ -891,7 +891,7 @@ func (p *Progress) log(mode string, suffix string, te *txExecutor, rs *state.Sta
if stepsInDb > 0 {
vals = append(vals, []any{
"stepsInDB", fmt.Sprintf("%.2f", stepsInDb),
"step", fmt.Sprintf("%.1f", float64(te.lastCommittedTxNum)/float64(te.agg.StepSize())),
"step", fmt.Sprintf("%.1f", float64(te.lastCommittedTxNum.Load())/float64(te.agg.StepSize())),
}...)
}

Expand Down
38 changes: 18 additions & 20 deletions execution/stagedsync/exec3_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
var uncommittedGas int64
var flushPending bool
var hasLoggedExecution bool
var hasLoggedCommittments bool
var hasLoggedCommittments atomic.Bool
var commitStart time.Time

var stepsInDb = rawdbhelpers.IdxStepsCountV3(rwTx, pe.agg.StepSize())
Expand Down Expand Up @@ -263,7 +263,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
if !dbg.BatchCommitments || shouldGenerateChangesets || lastBlockResult.BlockNum == maxBlockNum ||
applyResult.Exhausted != nil ||
pe.cfg.syncCfg.KeepExecutionProofs ||
(flushPending && lastBlockResult.BlockNum > pe.lastCommittedBlockNum) {
(flushPending && lastBlockResult.BlockNum > pe.lastCommittedBlockNum.Load()) {

resetExecGauges(ctx)

Expand Down Expand Up @@ -301,7 +301,7 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
commitedBlocks := uint64(math.Round(float64(uncommittedBlocks) * progress))

if commitedBlocks > prevCommitedBlocks {
hasLoggedCommittments = true
hasLoggedCommittments.Store(true)
pe.LogCommitments(commitStart,
commitedBlocks-prevCommitedBlocks,
committedTransactions-prevCommittedTransactions,
Expand All @@ -325,8 +325,8 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
return
case progress, ok := <-commitProgress:
if !ok {
if !hasLoggedCommittments || time.Since(lastCommitedLog) > logInterval/20 {
hasLoggedCommittments = true
if !hasLoggedCommittments.Load() || time.Since(lastCommitedLog) > logInterval/20 {
hasLoggedCommittments.Store(true)
pe.LogCommitments(commitStart,
uint64(uncommittedBlocks), uncommittedTransactions,
uint64(uncommittedGas), stepsInDb, lastProgress)
Expand Down Expand Up @@ -386,8 +386,8 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U

<-LogCommitmentsDone // make sure no async mutations by LogCommitments can happen at this point
// fix these here - they will contain estimates after commit logging
pe.txExecutor.lastCommittedBlockNum = lastBlockResult.BlockNum
pe.txExecutor.lastCommittedTxNum = lastBlockResult.lastTxNum
pe.txExecutor.lastCommittedBlockNum.Store(lastBlockResult.BlockNum)
pe.txExecutor.lastCommittedTxNum.Store(lastBlockResult.lastTxNum)
uncommittedBlocks = 0
uncommittedGas = 0
uncommittedTransactions = 0
Expand Down Expand Up @@ -440,8 +440,13 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
pe.LogExecution()
}

if !hasLoggedCommittments && !commitStart.IsZero() {
pe.LogCommitments(commitStart, pe.txExecutor.lastCommittedBlockNum, uncommittedTransactions, uint64(uncommittedGas), stepsInDb, lastProgress)
// Wait for all goroutines to complete before reading shared state
if err := pe.wait(ctx); err != nil {
return nil, rwTx, err
}

if !hasLoggedCommittments.Load() && !commitStart.IsZero() {
pe.LogCommitments(commitStart, pe.txExecutor.lastCommittedBlockNum.Load(), uncommittedTransactions, uint64(uncommittedGas), stepsInDb, lastProgress)
}

if execErr != nil {
Expand All @@ -450,17 +455,10 @@ func (pe *parallelExecutor) exec(ctx context.Context, execStage *StageState, u U
}
}

if err := pe.wait(ctx); err != nil {
if err = execStage.Update(rwTx, pe.lastCommittedBlockNum.Load()); err != nil {
return nil, rwTx, err
}

if err = execStage.Update(rwTx, pe.lastCommittedBlockNum); err != nil {
return nil, rwTx, err
}

if err = pe.wait(ctx); err != nil {
return nil, rwTx, fmt.Errorf("wait failed: %w", err)
}
return lastHeader, rwTx, execErr
}

Expand All @@ -475,9 +473,9 @@ func (pe *parallelExecutor) LogExecution() {
}

func (pe *parallelExecutor) LogCommitments(commitStart time.Time, committedBlocks uint64, committedTransactions uint64, committedGas uint64, stepsInDb float64, lastProgress commitment.CommitProgress) {
pe.committedGas += int64(committedGas)
pe.txExecutor.lastCommittedBlockNum += committedBlocks
pe.txExecutor.lastCommittedTxNum += committedTransactions
pe.committedGas.Add(int64(committedGas))
pe.txExecutor.lastCommittedBlockNum.Add(committedBlocks)
pe.txExecutor.lastCommittedTxNum.Add(committedTransactions)
pe.progress.LogCommitments(pe.rs.StateV3, pe, commitStart, stepsInDb, lastProgress)
if domainMetrics := pe.domains().LogMetrics(); len(domainMetrics) > 0 {
pe.logger.Info(fmt.Sprintf("[%s] domain reads", pe.logPrefix), domainMetrics...)
Expand Down
10 changes: 5 additions & 5 deletions execution/stagedsync/exec3_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (se *serialExecutor) exec(ctx context.Context, execStage *StageState, u Unw
return b.HeaderNoCopy(), rwTx, nil
}
resetCommitmentGauges(ctx)
se.txExecutor.lastCommittedBlockNum = b.NumberU64()
se.txExecutor.lastCommittedTxNum = inputTxNum
se.txExecutor.lastCommittedBlockNum.Store(b.NumberU64())
se.txExecutor.lastCommittedTxNum.Store(inputTxNum)
se.logger.Info(
"periodic commit check",
"block", se.doms.BlockNum(),
Expand Down Expand Up @@ -238,9 +238,9 @@ func (se *serialExecutor) LogExecution() {
}

func (se *serialExecutor) LogCommitments(commitStart time.Time, committedBlocks uint64, committedTransactions uint64, committedGas uint64, stepsInDb float64, lastProgress commitment.CommitProgress) {
se.committedGas += int64(committedGas)
se.txExecutor.lastCommittedBlockNum += committedBlocks
se.txExecutor.lastCommittedTxNum += committedTransactions
se.committedGas.Add(int64(committedGas))
se.txExecutor.lastCommittedBlockNum.Add(committedBlocks)
se.txExecutor.lastCommittedTxNum.Add(committedTransactions)
se.progress.LogCommitments(se.rs.StateV3, se, commitStart, stepsInDb, lastProgress)
}

Expand Down
Loading