Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ func stageHashState(db kv.RwDB, ctx context.Context, logger log.Logger) error {
}

func stageLogIndex(db kv.RwDB, ctx context.Context, logger log.Logger) error {
dirs, pm, historyV3 := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db)
dirs, pm, historyV3, chainConfig := datadir.New(datadirCli), fromdb.PruneMode(db), kvcfg.HistoryV3.FromDB(db), fromdb.ChainConfig(db)
if historyV3 {
return fmt.Errorf("this stage is disable in --history.v3=true")
}
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func stageLogIndex(db kv.RwDB, ctx context.Context, logger log.Logger) error {
logger.Info("Stage exec", "progress", execAt)
logger.Info("Stage", "name", s.ID, "progress", s.BlockNumber)

cfg := stagedsync.StageLogIndexCfg(db, pm, dirs.Tmp)
cfg := stagedsync.StageLogIndexCfg(db, pm, dirs.Tmp, chainConfig.NoPruneContracts)
if unwind > 0 {
u := sync.NewUnwindState(stages.LogIndex, s.BlockNumber-unwind, s.BlockNumber)
err = stagedsync.UnwindLogIndex(u, s, tx, cfg, ctx)
Expand Down
6 changes: 5 additions & 1 deletion erigon-lib/chain/chain_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,15 @@ type Config struct {
Clique *CliqueConfig `json:"clique,omitempty"`
Aura *AuRaConfig `json:"aura,omitempty"`
Bor *BorConfig `json:"bor,omitempty"`

//For not pruning these contracts (e.g. depsoit contract logs needed by CL)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add here more comments. if you know more information now. like: "why CL need it" and "in which mode (archive/non-archive) CL need it" and maybe something else.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I am only aware that both modes need it initially for constructing deposit data. Deeper dive into CL logic may be needed for me. And here I am just trying to preserve the logs, by default, irrespective of the specification of prune= flag.

I must run a validator test for this, although I am getting the desired results through RPC, the CL may have more expectations that I am not aware of.

There are also talks about Deposit snapshots which, then, would make this whole jutsu pointless

NoPruneContracts map[common.Address]bool `json:"noPruneContracts,omitempty"`
}

func (c *Config) String() string {
engine := c.getEngine()

return fmt.Sprintf("{ChainID: %v, Homestead: %v, DAO: %v, Tangerine Whistle: %v, Spurious Dragon: %v, Byzantium: %v, Constantinople: %v, Petersburg: %v, Istanbul: %v, Muir Glacier: %v, Berlin: %v, London: %v, Arrow Glacier: %v, Gray Glacier: %v, Terminal Total Difficulty: %v, Merge Netsplit: %v, Shanghai: %v, Cancun: %v, Prague: %v, Engine: %v}",
return fmt.Sprintf("{ChainID: %v, Homestead: %v, DAO: %v, Tangerine Whistle: %v, Spurious Dragon: %v, Byzantium: %v, Constantinople: %v, Petersburg: %v, Istanbul: %v, Muir Glacier: %v, Berlin: %v, London: %v, Arrow Glacier: %v, Gray Glacier: %v, Terminal Total Difficulty: %v, Merge Netsplit: %v, Shanghai: %v, Cancun: %v, Prague: %v, Engine: %v, NoPruneContracts: %v}",
c.ChainID,
c.HomesteadBlock,
c.DAOForkBlock,
Expand All @@ -108,6 +111,7 @@ func (c *Config) String() string {
c.CancunTime,
c.PragueTime,
engine,
c.NoPruneContracts,
)
}

Expand Down
24 changes: 20 additions & 4 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func executeBlock(
receipts = execRs.Receipts
stateSyncReceipt = execRs.StateSyncReceipt

if writeReceipts {
if writeReceipts || filterNoPruneReceipts(receipts, cfg.chainConfig).Len() > 0 {
if err = rawdb.AppendReceipts(tx, blockNum, receipts); err != nil {
return err
}
Expand All @@ -203,6 +203,21 @@ func executeBlock(
return nil
}

// Filters out receipts of contracts that may be needed by CL, such as deposit contract
func filterNoPruneReceipts(receipts types.Receipts, chainCfg *chain.Config) types.Receipts {
cr := types.Receipts{}
for _, r := range receipts {
for _, l := range r.Logs{
if chainCfg.NoPruneContracts[l.Address] {
cr = append(cr, r)
break
}
}
}
receipts = cr
return cr
}

func newStateReaderWriter(
batch kv.StatelessRwTx,
tx kv.RwTx,
Expand Down Expand Up @@ -892,10 +907,11 @@ func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx con
if err = rawdb.PruneTable(tx, kv.BorReceipts, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxUint32); err != nil {
return err
}
// EDIT: Don't prune yet, let LogIndex stage take care of it
// LogIndex.Prune will read everything what not pruned here
if err = rawdb.PruneTable(tx, kv.Log, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxInt32); err != nil {
return err
}
// if err = rawdb.PruneTable(tx, kv.Log, cfg.prune.Receipts.PruneTo(s.ForwardProgress), ctx, math.MaxInt32); err != nil {
// return err
// }
}
if cfg.prune.CallTraces.Enabled() {
if err = rawdb.PruneTableDupSort(tx, kv.CallTraceSet, logPrefix, cfg.prune.CallTraces.PruneTo(s.ForwardProgress), logEvery, ctx); err != nil {
Expand Down
46 changes: 27 additions & 19 deletions eth/stagedsync/stage_log_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,22 @@ const (
)

type LogIndexCfg struct {
tmpdir string
db kv.RwDB
prune prune.Mode
bufLimit datasize.ByteSize
flushEvery time.Duration
tmpdir string
db kv.RwDB
prune prune.Mode
bufLimit datasize.ByteSize
flushEvery time.Duration
noPruneContracts map[libcommon.Address]bool
}

func StageLogIndexCfg(db kv.RwDB, prune prune.Mode, tmpDir string) LogIndexCfg {
func StageLogIndexCfg(db kv.RwDB, prune prune.Mode, tmpDir string, noPruneContracts map[libcommon.Address]bool) LogIndexCfg {
return LogIndexCfg{
db: db,
prune: prune,
bufLimit: bitmapsBufLimit,
flushEvery: bitmapsFlushEvery,
tmpdir: tmpDir,
db: db,
prune: prune,
bufLimit: bitmapsBufLimit,
flushEvery: bitmapsFlushEvery,
tmpdir: tmpDir,
noPruneContracts: noPruneContracts,
}
}

Expand Down Expand Up @@ -80,14 +82,14 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

startBlock := s.BlockNumber
pruneTo := cfg.prune.Receipts.PruneTo(endBlock)
if startBlock < pruneTo {
startBlock = pruneTo
}
pruneTo := cfg.prune.Receipts.PruneTo(endBlock) //endBlock - prune.r.older
// if startBlock < pruneTo {
// startBlock = pruneTo
// }
if startBlock > 0 {
startBlock++
}
if err = promoteLogIndex(logPrefix, tx, startBlock, endBlock, cfg, ctx, logger); err != nil {
if err = promoteLogIndex(logPrefix, tx, startBlock, endBlock, pruneTo, cfg, ctx, logger); err != nil {
return err
}
if err = s.Update(tx, endBlock); err != nil {
Expand All @@ -103,7 +105,7 @@ func SpawnLogIndex(s *StageState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
return nil
}

func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, endBlock uint64, cfg LogIndexCfg, ctx context.Context, logger log.Logger) error {
func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, endBlock uint64, pruneBlock uint64, cfg LogIndexCfg, ctx context.Context, logger log.Logger) error {
quit := ctx.Done()
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()
Expand Down Expand Up @@ -175,6 +177,9 @@ func promoteLogIndex(logPrefix string, tx kv.RwTx, start uint64, endBlock uint64
}

for _, l := range ll {
if cfg.noPruneContracts != nil && !cfg.noPruneContracts[l.Address] && l.BlockNumber < pruneBlock {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for performance maybe better place l.BlockNumber < pruneBlock before map check

continue
}
for _, topic := range l.Topics {
topicStr := string(topic.Bytes())
m, ok := topics[topicStr]
Expand Down Expand Up @@ -405,7 +410,7 @@ func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
}

pruneTo := cfg.prune.Receipts.PruneTo(s.ForwardProgress)
if err = pruneLogIndex(logPrefix, tx, cfg.tmpdir, pruneTo, ctx, logger); err != nil {
if err = pruneLogIndex(logPrefix, tx, cfg.tmpdir, pruneTo, ctx, logger, cfg.noPruneContracts); err != nil {
return err
}
if err = s.Done(tx); err != nil {
Expand All @@ -420,7 +425,7 @@ func PruneLogIndex(s *PruneState, tx kv.RwTx, cfg LogIndexCfg, ctx context.Conte
return nil
}

func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneTo uint64, ctx context.Context, logger log.Logger) error {
func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneTo uint64, ctx context.Context, logger log.Logger, noPruneContracts map[libcommon.Address]bool) error {
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

Expand Down Expand Up @@ -461,6 +466,9 @@ func pruneLogIndex(logPrefix string, tx kv.RwTx, tmpDir string, pruneTo uint64,
}

for _, l := range logs {
if noPruneContracts != nil && noPruneContracts[l.Address] {
continue
}
for _, topic := range l.Topics {
if err := topics.Collect(topic.Bytes(), nil); err != nil {
return err
Expand Down
16 changes: 8 additions & 8 deletions eth/stagedsync/stage_log_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ func TestPromoteLogIndex(t *testing.T) {

expectAddrs, expectTopics := genReceipts(t, tx, 100)

cfg := StageLogIndexCfg(nil, prune.DefaultMode, "")
cfg := StageLogIndexCfg(nil, prune.DefaultMode, "", nil)
cfgCopy := cfg
cfgCopy.bufLimit = 10
cfgCopy.flushEvery = time.Nanosecond

err := promoteLogIndex("logPrefix", tx, 0, 0, cfgCopy, ctx, logger)
err := promoteLogIndex("logPrefix", tx, 0, 0, 0, cfgCopy, ctx, logger)
require.NoError(err)

// Check indices GetCardinality (in how many blocks they meet)
Expand All @@ -127,15 +127,15 @@ func TestPruneLogIndex(t *testing.T) {

_, _ = genReceipts(t, tx, 100)

cfg := StageLogIndexCfg(nil, prune.DefaultMode, "")
cfg := StageLogIndexCfg(nil, prune.DefaultMode, "", nil)
cfgCopy := cfg
cfgCopy.bufLimit = 10
cfgCopy.flushEvery = time.Nanosecond
err := promoteLogIndex("logPrefix", tx, 0, 0, cfgCopy, ctx, logger)
err := promoteLogIndex("logPrefix", tx, 0, 0, 0, cfgCopy, ctx, logger)
require.NoError(err)

// Mode test
err = pruneLogIndex("", tx, tmpDir, 50, ctx, logger)
err = pruneLogIndex("", tx, tmpDir, 50, ctx, logger, nil)
require.NoError(err)

{
Expand Down Expand Up @@ -167,15 +167,15 @@ func TestUnwindLogIndex(t *testing.T) {

expectAddrs, expectTopics := genReceipts(t, tx, 100)

cfg := StageLogIndexCfg(nil, prune.DefaultMode, "")
cfg := StageLogIndexCfg(nil, prune.DefaultMode, "", nil)
cfgCopy := cfg
cfgCopy.bufLimit = 10
cfgCopy.flushEvery = time.Nanosecond
err := promoteLogIndex("logPrefix", tx, 0, 0, cfgCopy, ctx, logger)
err := promoteLogIndex("logPrefix", tx, 0, 0, 0, cfgCopy, ctx, logger)
require.NoError(err)

// Mode test
err = pruneLogIndex("", tx, tmpDir, 50, ctx, logger)
err = pruneLogIndex("", tx, tmpDir, 50, ctx, logger, nil)
require.NoError(err)

// Unwind test
Expand Down
Loading