Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b8d919b
save
Giulio2002 Jul 16, 2024
8ba2cba
save
Giulio2002 Jul 16, 2024
0318a39
save
Giulio2002 Jul 16, 2024
cdc753d
save
Giulio2002 Jul 16, 2024
a372b4f
save
Giulio2002 Jul 16, 2024
d1f1e3d
save
Giulio2002 Jul 16, 2024
d0d21f0
save
Giulio2002 Jul 16, 2024
219020c
save
Giulio2002 Jul 16, 2024
cdd3392
save
Giulio2002 Jul 16, 2024
3ce0d86
save
Giulio2002 Jul 16, 2024
16df018
save
Giulio2002 Jul 16, 2024
9f6b6b2
save
Giulio2002 Jul 16, 2024
930fc93
save
Giulio2002 Jul 16, 2024
6f311b4
save
Giulio2002 Jul 16, 2024
5cd4e19
save
Giulio2002 Jul 16, 2024
0094f56
save
Giulio2002 Jul 16, 2024
ed8e38a
save
Giulio2002 Jul 16, 2024
3ac67d6
save
Giulio2002 Jul 16, 2024
c64bc9e
save
Giulio2002 Jul 16, 2024
4e22685
save
Giulio2002 Jul 16, 2024
23d3c45
save
Giulio2002 Jul 16, 2024
302b04b
save
Giulio2002 Jul 16, 2024
b5ca96c
save
Giulio2002 Jul 17, 2024
5f1fcf4
save
Giulio2002 Jul 17, 2024
ba82795
Merge remote-tracking branch 'origin/main' into HEAD
Giulio2002 Jul 17, 2024
fe2d671
save
Giulio2002 Jul 17, 2024
f442e2e
save
Giulio2002 Jul 17, 2024
06afbd5
save
Giulio2002 Jul 17, 2024
294865b
save
Giulio2002 Jul 17, 2024
cf37670
save
Giulio2002 Jul 18, 2024
76a664e
save
Giulio2002 Jul 18, 2024
7f65033
save
Giulio2002 Jul 18, 2024
c991853
save
Giulio2002 Jul 18, 2024
1553906
Merge branch 'main' into good-prune
Giulio2002 Jul 18, 2024
69c0b0e
save
Giulio2002 Jul 18, 2024
64af438
save
Giulio2002 Jul 18, 2024
8ada3d1
save
Giulio2002 Jul 18, 2024
d0735e9
save
Giulio2002 Jul 18, 2024
8f5acd4
save
Giulio2002 Jul 18, 2024
5320d7f
save
Giulio2002 Jul 18, 2024
8523a5c
save
Giulio2002 Jul 18, 2024
1651ee6
Merge remote-tracking branch 'origin/main' into HEAD
Giulio2002 Jul 18, 2024
1e6030b
save
Giulio2002 Jul 18, 2024
2698ef7
save
Giulio2002 Jul 18, 2024
bb81869
save
Giulio2002 Jul 18, 2024
692eb59
save
Giulio2002 Jul 18, 2024
b648f74
save
Giulio2002 Jul 18, 2024
4af3bac
save
Giulio2002 Jul 18, 2024
678416b
save
Giulio2002 Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,9 +1017,6 @@ func stageExec(db kv.RwDB, ctx context.Context, logger log.Logger) error {
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}

syncCfg := ethconfig.Defaults.Sync
Expand Down Expand Up @@ -1123,9 +1120,6 @@ func stageCustomTrace(db kv.RwDB, ctx context.Context, logger log.Logger) error
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}

syncCfg := ethconfig.Defaults.Sync
Expand Down Expand Up @@ -1244,9 +1238,6 @@ func stageTxLookup(db kv.RwDB, ctx context.Context, logger log.Logger) error {
s := stage(sync, tx, nil, stages.TxLookup)
if pruneTo > 0 {
pm.History = prune.Distance(s.BlockNumber - pruneTo)
pm.Receipts = prune.Distance(s.BlockNumber - pruneTo)
pm.CallTraces = prune.Distance(s.BlockNumber - pruneTo)
pm.TxIndex = prune.Distance(s.BlockNumber - pruneTo)
}
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

Expand Down Expand Up @@ -1547,8 +1538,7 @@ func stage(st *stagedsync.Sync, tx kv.Tx, db kv.RoDB, stage stages.SyncStage) *s

func overrideStorageMode(db kv.RwDB, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
pm, err := prune.FromCli(chainConfig.ChainID.Uint64(), pruneFlag, pruneB, pruneH, pruneR, pruneT, pruneC,
pruneHBefore, pruneRBefore, pruneTBefore, pruneCBefore, pruneBBefore, experiments)
pm, err := prune.FromCli(chainConfig.ChainID.Uint64(), pruneB, pruneH, experiments)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@ func (a *Aggregator) DiscardHistory(name kv.Domain) *Aggregator {
a.d[name].historyDisabled = true
return a
}

func (a *Aggregator) DiscardInvertedIndex(name kv.InvertedIdxPos) *Aggregator {
a.iis[name].discardIdx = true
return a
}

func (a *Aggregator) EnableHistory(name kv.Domain) *Aggregator {
a.d[name].historyDisabled = false
return a
Expand Down
3 changes: 3 additions & 0 deletions erigon-lib/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,7 @@ func (dt *DomainRoTx) DomainRangeLatest(roTx kv.Tx, fromKey, toKey []byte, limit
func (dt *DomainRoTx) CanPruneUntil(tx kv.Tx, untilTx uint64) bool {
canDomain, _ := dt.canPruneDomainTables(tx, untilTx)
canHistory, _ := dt.ht.canPruneUntil(tx, untilTx)
fmt.Println("CanPruneUntil", canDomain, canHistory)
return canHistory || canDomain
}

Expand All @@ -1670,6 +1671,7 @@ func (dt *DomainRoTx) canPruneDomainTables(tx kv.Tx, untilTx uint64) (can bool,
dt.d.logger.Error("get domain pruning progress", "name", dt.d.filenameBase, "error", err)
return false, maxStepToPrune
}
fmt.Println(sm, maxStepToPrune, untilStep)

delta := float64(max(maxStepToPrune, sm) - min(maxStepToPrune, sm)) // maxStep could be 0
switch dt.d.filenameBase {
Expand Down Expand Up @@ -1739,6 +1741,7 @@ func (dt *DomainRoTx) Prune(ctx context.Context, rwTx kv.RwTx, step, txFrom, txT
return nil, fmt.Errorf("prune history at step %d [%d, %d): %w", step, txFrom, txTo, err)
}
canPrune, maxPrunableStep := dt.canPruneDomainTables(rwTx, txTo)
fmt.Println("dt.canPruneDomainTables", canPrune, maxPrunableStep)
if !canPrune {
return stat, nil
}
Expand Down
5 changes: 3 additions & 2 deletions erigon-lib/state/inverted_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ type InvertedIndex struct {
// fields for history write
logger log.Logger

noFsync bool // fsync is enabled by default, but tests can manually disable
noFsync bool // fsync is enabled by default, but tests can manually disable
discardIdx bool

compression FileCompression
compressWorkers int
Expand Down Expand Up @@ -381,7 +382,7 @@ func (w *invertedIndexBufferedWriter) Add(key []byte) error {
}

func (iit *InvertedIndexRoTx) NewWriter() *invertedIndexBufferedWriter {
return iit.newWriter(iit.ii.dirs.Tmp, false)
return iit.newWriter(iit.ii.dirs.Tmp, iit.ii.discardIdx)
}

type invertedIndexBufferedWriter struct {
Expand Down
30 changes: 19 additions & 11 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,21 @@ func (p *Progress) Log(rs *state.StateV3, in *state.QueueWithRetry, rws *state.R
interval := currentTime.Sub(p.prevTime)
speedTx := float64(doneCount-p.prevCount) / (float64(interval) / float64(time.Second))
//var repeatRatio float64
//if doneCount > p.prevCount {
// repeatRatio = 100.0 * float64(repeatCount-p.prevRepeatCount) / float64(doneCount-p.prevCount)
//}
p.logger.Info(fmt.Sprintf("[%s] Transaction replay", p.logPrefix),
//"workers", workerCount,
logArgs := []interface{}{
"blk", outputBlockNum,
"tx/s", fmt.Sprintf("%.1f", speedTx),
//"pipe", fmt.Sprintf("(%d+%d)->%d/%d->%d/%d", in.NewTasksLen(), in.RetriesLen(), rws.ResultChLen(), rws.ResultChCap(), rws.Len(), rws.Limit()),
//"repeatRatio", fmt.Sprintf("%.2f%%", repeatRatio),
//"workers", p.workersCount,
"buffer", fmt.Sprintf("%s/%s", common.ByteCount(sizeEstimate), common.ByteCount(p.commitThreshold)),
"stepsInDB", fmt.Sprintf("%.2f", idxStepsAmountInDB),
"step", fmt.Sprintf("%.1f", float64(outTxNum)/float64(config3.HistoryV3AggregationStep)),
"alloc", common.ByteCount(m.Alloc), "sys", common.ByteCount(m.Sys),
)
}
if idxStepsAmountInDB > 0.01 {
logArgs = append(logArgs, "stepsInDB", fmt.Sprintf("%.2f", idxStepsAmountInDB))
}

p.logger.Info(fmt.Sprintf("[%s] Transaction replay", p.logPrefix), logArgs...)

p.prevTime = currentTime
p.prevCount = doneCount
Expand Down Expand Up @@ -192,15 +192,23 @@ func ExecV3(ctx context.Context,
if initialCycle {
agg.SetCollateAndBuildWorkers(min(2, estimate.StateV3Collate.Workers()))
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
//if blockNum < cfg.blockReader.FrozenBlocks() {
//defer agg.DiscardHistory(kv.CommitmentDomain).EnableHistory(kv.CommitmentDomain)
//defer agg.LimitRecentHistoryWithoutFiles(0).LimitRecentHistoryWithoutFiles(agg.StepSize() / 10)
//}
} else {
agg.SetCompressWorkers(1)
agg.SetCollateAndBuildWorkers(1)
}

pruneNonEssentials := cfg.prune.History.Enabled() && cfg.prune.History.PruneTo(execStage.BlockNumber) == execStage.BlockNumber
// Disable all inverted indexes if we do max pruning
if pruneNonEssentials {
agg.DiscardInvertedIndex(kv.LogAddrIdxPos)
agg.DiscardInvertedIndex(kv.LogTopicIdxPos)
agg.DiscardInvertedIndex(kv.TracesFromIdxPos)
agg.DiscardInvertedIndex(kv.TracesToIdxPos)
// We do not discard account because it's used in snapshot generation.
agg.DiscardHistory(kv.StorageDomain)
agg.DiscardHistory(kv.CodeDomain)
}

var err error
inMemExec := txc.Doms != nil
var doms *state2.SharedDomains
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_call_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ func PruneCallTraces(s *PruneState, tx kv.RwTx, cfg CallTracesCfg, ctx context.C
defer tx.Rollback()
}

if cfg.prune.CallTraces.Enabled() {
if err = pruneCallTraces(tx, logPrefix, cfg.prune.CallTraces.PruneTo(s.ForwardProgress), ctx, cfg.tmpdir, logger); err != nil {
if cfg.prune.History.Enabled() {
if err = pruneCallTraces(tx, logPrefix, cfg.prune.History.PruneTo(s.ForwardProgress), ctx, cfg.tmpdir, logger); err != nil {
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

startBlock := s.BlockNumber
pruneTo := cfg.prune.Receipts.PruneTo(endBlock) //endBlock - prune.r.older
pruneTo := cfg.prune.History.PruneTo(endBlock) //endBlock - prune.r.older
// if startBlock < pruneTo {
// startBlock = pruneTo
// }
Expand Down Expand Up @@ -435,7 +435,7 @@ func pruneOldLogChunks(tx kv.RwTx, bucket string, inMem *etl.Collector, pruneTo

// Call pruneLogIndex with the current sync progresses and commit the data to db
func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Context, logger log.Logger) (err error) {
if !cfg.prune.Receipts.Enabled() {
if !cfg.prune.History.Enabled() {
return nil
}
logPrefix := s.LogPrefix()
Expand All @@ -449,7 +449,7 @@ func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
defer tx.Rollback()
}

pruneTo := cfg.prune.Receipts.PruneTo(s.ForwardProgress)
pruneTo := cfg.prune.History.PruneTo(s.ForwardProgress)
if err = pruneLogIndex(logPrefix, tx, cfg.tmpdir, s.PruneProgress, pruneTo, ctx, logger, cfg.depositContract); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ func PruneSendersStage(s *PruneState, tx kv.RwTx, cfg SendersCfg, ctx context.Co
}
if cfg.blockReader.FreezingCfg().Enabled {
// noop. in this case senders will be deleted by BlockRetire.PruneAncientBlocks after data-freezing.
} else if cfg.prune.TxIndex.Enabled() {
to := cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
} else if cfg.prune.History.Enabled() {
to := cfg.prune.History.PruneTo(s.ForwardProgress)
if err = rawdb.PruneTable(tx, kv.Senders, to, ctx, 100); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions eth/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func SpawnTxLookup(s *StageState, tx kv.RwTx, toBlock uint64, cfg TxLookupCfg, c
}

startBlock := s.BlockNumber
if cfg.prune.TxIndex.Enabled() {
pruneTo := cfg.prune.TxIndex.PruneTo(endBlock)
if cfg.prune.History.Enabled() {
pruneTo := cfg.prune.History.PruneTo(endBlock)
if startBlock < pruneTo {
startBlock = pruneTo
if err = s.UpdatePrune(tx, pruneTo); err != nil { // prune func of this stage will use this value to prevent all ancient blocks traversal
Expand Down Expand Up @@ -248,8 +248,8 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
var pruneBor bool

// Forward stage doesn't write anything before PruneTo point
if cfg.prune.TxIndex.Enabled() {
blockTo = cfg.prune.TxIndex.PruneTo(s.ForwardProgress)
if cfg.prune.History.Enabled() {
blockTo = cfg.prune.History.PruneTo(s.ForwardProgress)
pruneBor = true
} else if cfg.blockReader.FreezingCfg().Enabled {
blockTo = cfg.blockReader.CanPruneTo(s.ForwardProgress)
Expand Down
Loading