diff --git a/cli/smartcontract/permission.go b/cli/smartcontract/permission.go index 51071509b1..9f10edff0e 100644 --- a/cli/smartcontract/permission.go +++ b/cli/smartcontract/permission.go @@ -51,9 +51,9 @@ func (p permission) MarshalYAML() (any, error) { return m, nil } -func (p *permission) UnmarshalYAML(unmarshal func(any) error) error { +func (p *permission) UnmarshalYAML(node *yaml.Node) error { var m map[string]any - if err := unmarshal(&m); err != nil { + if err := node.Decode(&m); err != nil { return err } diff --git a/docs/node-configuration.md b/docs/node-configuration.md index e7634236f4..9647e80e9c 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -33,13 +33,13 @@ node-related settings described in the table below. | Prometheus | [Metrics Services Configuration](#Metrics-Services-Configuration) | | Configuration for Prometheus (monitoring system). See the [Metrics Services Configuration](#Metrics-Services-Configuration) section for details | | Relay | `bool` | `true` | Determines whether the server is forwarding its inventory. | | Consensus | [Consensus Configuration](#Consensus-Configuration) | | Describes consensus (dBFT) configuration. See the [Consensus Configuration](#Consensus-Configuration) for details. | -| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only the last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. If enabled along with `P2PStateExchangeExtensions` protocol extension, then old blocks and MPT states will be removed up to the second latest state synchronisation point (see `StateSyncInterval`). | -| RemoveUntraceableHeaders | `bool`| `false` | Used only with RemoveUntraceableBlocks and makes node delete untraceable block headers as well. Notice that this is an experimental option, not recommended for production use. | +| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks, headers, transactions, execution results and transfer logs should be removed from cache and database. If enabled, then only the last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. If enabled along with `P2PStateExchangeExtensions` protocol extension, then old blocks and MPT states will be removed up to the second latest state synchronisation point (see `StateSyncInterval`). | | RPC | [RPC Configuration](#RPC-Configuration) | | Describes [RPC subsystem](rpc.md) configuration. See the [RPC Configuration](#RPC-Configuration) for details. | | SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. | | SkipBlockVerification | `bool` | `false` | Allows to disable verification of received/processed blocks (including cryptographic checks). | | StateRoot | [State Root Configuration](#State-Root-Configuration) | | State root module configuration. See the [State Root Configuration](#State-Root-Configuration) section for details. | | SaveInvocations | `bool` | `false` | Determines if additional smart contract invocation details are stored. If enabled, the `getapplicationlog` RPC method will return a new field with invocation details for the transaction. See the [RPC](rpc.md#applicationlog-invocations) documentation for more information. | +| TrustedHeader | `map[uint32]Hash256` | `nil` | Determines header (height and hash in the LE form) to start light node synchronization from. Headers below trusted height won't be fetched and processed at all. Requires `RemoveUntraceableBlocks` to be enabled along with one of `P2PStateExchangeExtensions` or `NeoFSStateSyncExtensions`. | ### P2P Configuration diff --git a/pkg/config/application_config.go b/pkg/config/application_config.go index 2efa6f405d..d7ea935351 100644 --- a/pkg/config/application_config.go +++ b/pkg/config/application_config.go @@ -166,5 +166,8 @@ func (a *ApplicationConfiguration) Validate() error { if err := a.Logger.Validate(); err != nil { return fmt.Errorf("invalid logger config: %w", err) } + if err := a.Ledger.Validate(); err != nil { + return fmt.Errorf("invalid ledger config: %w", err) + } return nil } diff --git a/pkg/config/application_config_test.go b/pkg/config/application_config_test.go index 02fa25cd9d..3169235eae 100644 --- a/pkg/config/application_config_test.go +++ b/pkg/config/application_config_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/require" ) @@ -233,13 +234,24 @@ func TestApplicationConfiguration_Validate(t *testing.T) { shouldFail: true, errMsg: "invalid logger config: invalid LogEncoding: unknown", }, + { + cfg: ApplicationConfiguration{ + Ledger: Ledger{ + TrustedHeader: HashIndex{ + Hash: util.Uint256{1, 2, 3}, + Index: 4, + }, + }, + }, + shouldFail: true, + errMsg: "TrustedHeader is set, but RemoveUntraceableBlocks is disabled", + }, } for _, c := range cases { err := c.cfg.Validate() if c.shouldFail { - require.Error(t, err) - require.Contains(t, err.Error(), c.errMsg) + require.ErrorContains(t, err, c.errMsg) } else { require.NoError(t, err) } diff --git a/pkg/config/genesis_extensions.go b/pkg/config/genesis_extensions.go index 8bb8c6093b..47cd2fbef1 100644 --- a/pkg/config/genesis_extensions.go +++ b/pkg/config/genesis_extensions.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/native/noderoles" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "gopkg.in/yaml.v3" ) // Genesis represents a set of genesis block settings including the extensions @@ -85,9 +86,9 @@ func (e Genesis) MarshalYAML() (any, error) { } // UnmarshalYAML implements the YAML unmarshaler interface. -func (e *Genesis) UnmarshalYAML(unmarshal func(any) error) error { +func (e *Genesis) UnmarshalYAML(node *yaml.Node) error { var aux genesisAux - if err := unmarshal(&aux); err != nil { + if err := node.Decode(&aux); err != nil { return err } diff --git a/pkg/config/ledger_config.go b/pkg/config/ledger_config.go index b8a6c68978..d09f8c42db 100644 --- a/pkg/config/ledger_config.go +++ b/pkg/config/ledger_config.go @@ -1,5 +1,13 @@ package config +import ( + "errors" + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/util" + "gopkg.in/yaml.v3" +) + // Ledger contains core node-specific settings that are not // a part of the ProtocolConfiguration (which is common for every node on the // network). @@ -12,11 +20,10 @@ type Ledger struct { // If true, DB size will be smaller, but older roots won't be accessible. // This value should remain the same for the same database. KeepOnlyLatestState bool `yaml:"KeepOnlyLatestState"` - // RemoveUntraceableBlocks specifies if old data should be removed. + // RemoveUntraceableBlocks specifies if old data (blocks, headers, + // transactions, execution results, transfer logs and MPT data) should be + // removed. RemoveUntraceableBlocks bool `yaml:"RemoveUntraceableBlocks"` - // RemoveUntraceableHeaders is used in addition to RemoveUntraceableBlocks - // when headers need to be removed as well. - RemoveUntraceableHeaders bool `yaml:"RemoveUntraceableHeaders"` // SaveStorageBatch enables storage batch saving before every persist. SaveStorageBatch bool `yaml:"SaveStorageBatch"` // SkipBlockVerification allows to disable verification of received @@ -24,6 +31,17 @@ type Ledger struct { SkipBlockVerification bool `yaml:"SkipBlockVerification"` // SaveInvocations enables smart contract invocation data saving. SaveInvocations bool `yaml:"SaveInvocations"` + // TrustedHeader is an index/hash of header that can be used to start + // light node headers synchronisation from (without additional verification). + // It's valid iff RemoveUntraceableBlocks is enabled along with one of + // P2PStateExchangeExtensions or NeoFSStateSyncExtensions. + TrustedHeader HashIndex `yaml:"TrustedHeader"` +} + +// HashIndex is a structure representing hash and index of block/header. +type HashIndex struct { + Hash util.Uint256 + Index uint32 } // Blockchain is a set of settings for core.Blockchain to use, it includes protocol @@ -34,3 +52,45 @@ type Blockchain struct { NeoFSBlockFetcher NeoFSStateFetcher } + +// Validate checks Ledger for internal consistency and returns an error if any +// invalid settings are found. +func (l Ledger) Validate() error { + if l.TrustedHeader.Index != 0 && !l.RemoveUntraceableBlocks { + return errors.New("TrustedHeader is set, but RemoveUntraceableBlocks is disabled") + } + return nil +} + +// MarshalYAML implements the YAML marshaller interface. +func (h HashIndex) MarshalYAML() (any, error) { + var startSyncFrom = make(map[uint32]util.Uint256) + if h.Index != 0 { + startSyncFrom[h.Index] = h.Hash + } + return startSyncFrom, nil +} + +// UnmarshalYAML implements the YAML Unmarshaler interface. +func (h *HashIndex) UnmarshalYAML(node *yaml.Node) error { + var aux map[uint32]util.Uint256 + + err := node.Decode(&aux) + if err != nil { + return err + } + if len(aux) > 1 { + return fmt.Errorf("only one trusted height is supported, got %d entries", len(aux)) + } + + if len(aux) > 0 { + for i, hh := range aux { + *h = HashIndex{ + Hash: hh, + Index: i, + } + } + } + + return nil +} diff --git a/pkg/config/ledger_config_test.go b/pkg/config/ledger_config_test.go new file mode 100644 index 0000000000..626ab05ee7 --- /dev/null +++ b/pkg/config/ledger_config_test.go @@ -0,0 +1,27 @@ +package config + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/internal/testserdes" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestHashIndex_MarshalUnmarshalYAML(t *testing.T) { + t.Run("good", func(t *testing.T) { + testserdes.MarshalUnmarshalYAML(t, &HashIndex{ + Hash: util.Uint256{1, 2, 3}, + Index: 1, + }, new(HashIndex)) + }) + t.Run("empty", func(t *testing.T) { + testserdes.MarshalUnmarshalYAML(t, &HashIndex{}, new(HashIndex)) + }) + t.Run("multiple heights", func(t *testing.T) { + require.ErrorContains(t, yaml.Unmarshal([]byte(` +1: `+util.Uint256{1, 2, 3}.String()+` +2: `+util.Uint256{1, 2, 3}.String()), new(HashIndex)), "only one trusted height is supported") + }) +} diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 1e87bb0e21..2153ded1d8 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -185,6 +185,12 @@ type Blockchain struct { // removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC()) gcBlockTimes *lru.Cache[uint32, uint64] + // gcLastUntraceableBlockHeight is the height of the latest untraceable block + // that was removed by GC. Storing this value is cheaper than performing Seek + // through all blocks every time we need to define the next header to be + // removed. + gcLastUntraceableBlockHeight uint32 + // Stop synchronization mechanisms. stopCh chan struct{} runToExitCh chan struct{} @@ -321,6 +327,12 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl if cfg.P2PStateExchangeExtensions && cfg.NeoFSStateSyncExtensions { return nil, errors.New("P2PStateExchangeExtensions and NeoFSStateSyncExtensions cannot be enabled simultaneously") } + if cfg.TrustedHeader.Index > 0 && !(cfg.P2PStateExchangeExtensions || cfg.NeoFSStateSyncExtensions) { + return nil, errors.New("TrustedHeader can not be used without P2PStateExchangeExtensions or NeoFSStateSyncExtensions") + } + if cfg.TrustedHeader.Index == 0 && (cfg.P2PStateExchangeExtensions || cfg.NeoFSStateSyncExtensions) { + log.Info("TrustedHeader is not set, headers synchronisation will start from latest stored header") + } if cfg.P2PStateExchangeExtensions { if !cfg.StateRootInHeader { return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") @@ -347,9 +359,6 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl zap.Int("StateSyncInterval", cfg.StateSyncInterval)) } } - if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks { - return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not") - } if cfg.Hardforks == nil { cfg.Hardforks = map[string]uint32{} for _, hf := range config.StableHardforks { @@ -495,7 +504,19 @@ func (bc *Blockchain) init() error { if err != nil { return err } - bc.HeaderHashes.initGenesis(bc.dao, genesisBlock.Hash()) + var trusted = config.HashIndex{ + Hash: genesisBlock.Hash(), + Index: 0, + } + if bc.config.TrustedHeader.Index > 0 { + minTrustedHeight := max(uint32(2*bc.config.StateSyncInterval), bc.GetMaxTraceableBlocks()) + if (bc.config.P2PStateExchangeExtensions || bc.config.NeoFSStateSyncExtensions) && + bc.config.TrustedHeader.Index <= minTrustedHeight { + return fmt.Errorf("trusted header is too low to start state synchronization: need at least %d, got %d", minTrustedHeight, bc.config.TrustedHeader.Index) + } + trusted = bc.config.TrustedHeader + } + bc.HeaderHashes.initMinTrustedHeader(bc.dao, trusted) if err := bc.stateRoot.Init(0); err != nil { return fmt.Errorf("can't init MPT: %w", err) } @@ -536,7 +557,7 @@ func (bc *Blockchain) init() error { // and the genesis block as first block. bc.log.Info("restoring blockchain", zap.String("version", version)) - err = bc.HeaderHashes.init(bc.dao) + err = bc.HeaderHashes.init(bc.dao, bc.config.TrustedHeader) if err != nil { return err } @@ -570,6 +591,13 @@ func (bc *Blockchain) init() error { bc.blockHeight = bHeight bc.persistedHeight = bHeight + bc.dao.Store.Seek(storage.SeekRange{ + Prefix: []byte{byte(storage.IXHeaderHashList)}, + }, func(k, _ []byte) bool { + bc.gcLastUntraceableBlockHeight = binary.BigEndian.Uint32(k[1:]) + return false + }) + bc.log.Debug("initializing caches", zap.Uint32("blockHeight", bHeight)) if err = bc.stateRoot.Init(bHeight); err != nil { return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err) @@ -700,7 +728,12 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro // After current state is updated, we need to remove outdated state-related data if so. // The only outdated data we might have is genesis-related data, so check it. if int(p)-int(mtb) > 0 { - _, err := cache.DeleteBlock(bc.GetHeaderHash(0), false) + // bc.HeaderHashes does not contain genesis hash since old hashes are removed with RUB. + genesisBlock, err := CreateGenesisBlock(bc.config.ProtocolConfiguration) + if err != nil { + return fmt.Errorf("failed to retrieve genesis block hash: %w", err) + } + _, err = cache.DeleteBlock(genesisBlock.Hash()) if err != nil { return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err) } @@ -774,7 +807,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro // resetRAMState resets in-memory cached info. func (bc *Blockchain) resetRAMState(height uint32, resetHeaders bool) error { if resetHeaders { - err := bc.HeaderHashes.init(bc.dao) + err := bc.HeaderHashes.init(bc.dao, config.HashIndex{}) if err != nil { return err } @@ -929,7 +962,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keysCnt = new(int) ) for i := height + 1; i <= currHeight; i++ { - _, err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false) + _, err := upperCache.DeleteBlock(bc.GetHeaderHash(i)) if err != nil { return fmt.Errorf("error while removing block %d: %w", i, err) } @@ -1070,7 +1103,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) for i := height + 1; i <= hHeight; i++ { upperCache.PurgeHeader(bc.GetHeaderHash(i)) } - upperCache.DeleteHeaderHashes(height+1, headerBatchCount) + upperCache.DeleteHeaderHashesHead(height+1, headerBatchCount) upperCache.StoreAsCurrentBlock(b) upperCache.PutCurrentHeader(b.Hash(), height) v.StoragePrefix = statesync.TemporaryPrefix(v.StoragePrefix) @@ -1145,9 +1178,9 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage) keys := 0 err = bc.store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(statesync.TemporaryPrefix(v.StoragePrefix))}, - }, func(_, _ []byte) bool { + }, func(_, _ []byte) (bool, bool) { keys++ - return false + return false, true }) if err != nil { return fmt.Errorf("faield to remove stale storage items from DB: %w", err) @@ -1263,10 +1296,13 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration { tgtBlock -= int64(mtb) if bc.config.P2PStateExchangeExtensions { + // Old blocks should be removed up to P2-MaxTraceableBlocks which is required for + // proper P2P state synchronization, hence align removal of transfers, MPT entries, + // blocks and header hashes with this value. syncP := newHeight / uint32(bc.config.StateSyncInterval) syncP-- syncP *= uint32(bc.config.StateSyncInterval) - tgtBlock = min(tgtBlock, int64(syncP)) + tgtBlock = min(tgtBlock, int64(syncP-mtb)) } // Always round to the GCP. tgtBlock /= int64(bc.config.Ledger.GarbageCollectionPeriod) @@ -1277,6 +1313,8 @@ func (bc *Blockchain) tryRunGC(oldHeight uint32) time.Duration { if tgtBlock > int64(bc.config.Ledger.GarbageCollectionPeriod) && newHeight != oldHeight { dur = bc.removeOldTransfers(uint32(tgtBlock)) dur += bc.stateRoot.GC(uint32(tgtBlock), bc.store) + dur += bc.removeUntraceableBlocks(newHeight, uint32(tgtBlock)) + dur += bc.removeOldHeaderHashes(uint32(tgtBlock)) } return dur } @@ -1443,7 +1481,7 @@ func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { err = bc.store.SeekGC(storage.SeekRange{ Prefix: prefixes[i : i+1], Backwards: true, // From new to old. - }, func(k, v []byte) bool { + }, func(k, v []byte) (bool, bool) { // We don't look inside of the batches, it requires too much effort, instead // we drop batches that are confirmed to contain outdated entries. var batchAcc util.Uint160 @@ -1454,13 +1492,13 @@ func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { acc = batchAcc } else if canDrop { // We've seen this account and all entries in this batch are guaranteed to be outdated. removed++ - return false + return false, true } // We don't know what's inside, so keep the current // batch anyway, but allow to drop older ones. canDrop = batchTs <= ts kept++ - return true + return true, true }) if err != nil { break @@ -1478,6 +1516,86 @@ func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration { return dur } +// removeOldHeaderHashes removes batches of header hashes starting from genesis up +// to the batch with header with the specified index (excluding this batch if some +// headers from this batch should not be removed). +func (bc *Blockchain) removeOldHeaderHashes(index uint32) time.Duration { + bc.log.Info("starting header hashes garbage collection", zap.Uint32("index", index)) + var ( + err error + removed int64 + start = time.Now() + till = ((int32(index)+1)/headerBatchCount - 1) * headerBatchCount + ) + if till > 0 { + err = bc.store.SeekGC(storage.SeekRange{ + Prefix: []byte{byte(storage.IXHeaderHashList)}, + }, func(k, _ []byte) (bool, bool) { + first := binary.BigEndian.Uint32(k[1:]) + if first <= uint32(till) { + removed += headerBatchCount + return false, first != uint32(till) + } + return true, false + }) + } + + dur := time.Since(start) + if err != nil { + bc.log.Error("failed to flush header hashes GC changeset", zap.Duration("time", dur), zap.Error(err)) + } else { + bc.log.Info("finished header hashes garbage collection", + zap.Int64("removed", removed), + zap.Duration("time", dur)) + } + return dur +} + +func (bc *Blockchain) removeUntraceableBlocks(newPeriod, tgtHeight uint32) time.Duration { + if newPeriod*bc.config.GarbageCollectionPeriod/headerBatchCount == tgtHeight/headerBatchCount { // current batch of header hashes is not stored yet. + tgtHeight = uint32(max(0, (int(tgtHeight)/headerBatchCount-1)*headerBatchCount)) + } + if tgtHeight == 0 { + return 0 + } + oldTgtHeight := bc.gcLastUntraceableBlockHeight + + bc.log.Info("starting untraceable blocks garbage collection", zap.Uint32("from", oldTgtHeight), zap.Uint32("to", tgtHeight)) + var ( + start = time.Now() + kvcache = bc.dao.GetPrivate() + removed int + ) + for index := oldTgtHeight; index < tgtHeight; index++ { + hh := bc.GetHeaderHash(index) + if hh.Equals(util.Uint256{}) { + // Empty hashes are allowed if recovering from trusted height. + continue + } + _, err := kvcache.DeleteBlock(hh) + if err != nil { + bc.log.Warn("error while removing old block", + zap.Uint32("index", index), + zap.Error(err)) + } else { + removed++ + } + } + bc.gcLastUntraceableBlockHeight = tgtHeight + + dur := time.Since(start) + _, err := kvcache.Persist() + if err != nil { + bc.log.Error("failed to flush untraceable blocks GC changeset", zap.Duration("time", dur), zap.Error(err)) + } else { + bc.log.Info("finished untraceable blocks garbage collection", + zap.Int("removed", removed), + zap.Duration("time", dur)) + } + + return dur +} + // notificationDispatcher manages subscription to events and broadcasts new events. func (bc *Blockchain) notificationDispatcher() { var ( @@ -1616,7 +1734,10 @@ func (bc *Blockchain) AddBlock(block *block.Block) error { var mp *mempool.Pool expectedHeight := bc.BlockHeight() + 1 if expectedHeight != block.Index { - return fmt.Errorf("expected %d, got %d: %w", expectedHeight, block.Index, ErrInvalidBlockIndex) + if block.Index > expectedHeight { + return fmt.Errorf("expected %d, got %d: %w", expectedHeight, block.Index, ErrInvalidBlockIndex) + } + return fmt.Errorf("%w: block with index %d is already on chain", ErrAlreadyExists, block.Index) } if bc.config.StateRootInHeader != block.StateRootEnabled { return fmt.Errorf("%w: %v != %v", @@ -1628,6 +1749,11 @@ func (bc *Blockchain) AddBlock(block *block.Block) error { if err != nil { return err } + } else { + expectedH := bc.GetHeaderHash(block.Index) + if expectedH != block.Hash() { + return fmt.Errorf("invalid block: hash mismatch: expected %s, got %s", expectedH.StringLE(), block.Hash().StringLE()) + } } if !bc.config.SkipBlockVerification { merkle := block.ComputeMerkleRoot() @@ -1676,7 +1802,7 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { if len(headers) > 0 { var i int curHeight := bc.HeaderHeight() - for i = range headers { + for ; i < len(headers); i++ { if headers[i].Index > curHeight { break } @@ -1686,17 +1812,33 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { if len(headers) == 0 { return nil - } else if verify { + } + // If it's a trusted header, verify its hash against configuration. + if headers[0].Index == bc.config.TrustedHeader.Index && !headers[0].Hash().Equals(bc.config.TrustedHeader.Hash) { + return fmt.Errorf("trusted header hash mismatch at %d: expected %s, got %s", headers[0].Index, bc.config.TrustedHeader.Hash.StringLE(), headers[0].Hash().StringLE()) + } + if verify { // Verify that the chain of the headers is consistent. - var lastHeader *block.Header - if lastHeader, err = bc.GetHeader(headers[0].PrevHash); err != nil { - return fmt.Errorf("previous header was not found: %w", err) - } - for _, h := range headers { - if err = bc.verifyHeader(h, lastHeader); err != nil { - return err + var ( + lastHeader *block.Header + verifyFrom uint32 + ) + if headers[0].Index == bc.config.TrustedHeader.Index { + verifyFrom++ + lastHeader = headers[0] + } + if len(headers) > int(verifyFrom) { + if lastHeader == nil { + if lastHeader, err = bc.GetHeader(headers[verifyFrom].PrevHash); err != nil { + return fmt.Errorf("previous header %d (%s) was not found: %w", headers[verifyFrom].Index, headers[verifyFrom].PrevHash.StringLE(), err) + } + } + for _, h := range headers[verifyFrom:] { + if err = bc.verifyHeader(h, lastHeader); err != nil { + return err + } + lastHeader = h } - lastHeader = h } } res := bc.HeaderHashes.addHeaders(headers...) @@ -1734,7 +1876,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions)) aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune. aerdone = make(chan error) - mtb = bc.GetMaxTraceableBlocks() ) go func() { var ( @@ -1745,31 +1886,8 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error transCache = make(map[util.Uint160]transferData) ) kvcache.StoreAsCurrentBlock(block) - if bc.config.Ledger.RemoveUntraceableBlocks { - var start, stop uint32 - if bc.config.P2PStateExchangeExtensions { - // remove batch of old blocks starting from P2-MaxTraceableBlocks-StateSyncInterval up to P2-MaxTraceableBlocks - if block.Index >= 2*uint32(bc.config.StateSyncInterval) && - block.Index >= uint32(bc.config.StateSyncInterval)+mtb && // check this in case if MaxTraceableBlocks>StateSyncInterval - int(block.Index)%bc.config.StateSyncInterval == 0 { - stop = block.Index - uint32(bc.config.StateSyncInterval) - mtb - start = stop - min(stop, uint32(bc.config.StateSyncInterval)) - } - } else if block.Index > mtb { - start = block.Index - mtb // is at least 1 - stop = start + 1 - } - for index := start; index < stop; index++ { - ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders) - if index%bc.config.Ledger.GarbageCollectionPeriod == 0 { - _ = bc.gcBlockTimes.Add(index, ts) - } - if err != nil { - bc.log.Warn("error while removing old block", - zap.Uint32("index", index), - zap.Error(err)) - } - } + if bc.config.Ledger.RemoveUntraceableBlocks && block.Index%bc.config.Ledger.GarbageCollectionPeriod == 0 { + _ = bc.gcBlockTimes.Add(block.Index, block.Timestamp) } for aer := range aerchan { if aer.Container == block.Hash() { @@ -1885,10 +2003,10 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error // because changes applied are the ones from HALTed transactions. return fmt.Errorf("error while trying to apply MPT changes: %w", err) } - if bc.config.StateRootInHeader && bc.HeaderHeight() > sr.Index { + if bc.config.StateRootInHeader && bc.HeaderHeight() > sr.Index && sr.Index > bc.config.TrustedHeader.Index { h, err := bc.GetHeader(bc.GetHeaderHash(sr.Index + 1)) if err != nil { - err = fmt.Errorf("failed to get next header: %w", err) + err = fmt.Errorf("failed to get next header %d to verify stateroot: %w", sr.Index+1, err) } else if h.PrevStateRoot != sr.Root { err = fmt.Errorf("local stateroot and next header's PrevStateRoot mismatch: %s vs %s", sr.Root.StringBE(), h.PrevStateRoot.StringBE()) } diff --git a/pkg/core/blockchain_core_test.go b/pkg/core/blockchain_core_test.go index e4ed20348a..0743f29f6f 100644 --- a/pkg/core/blockchain_core_test.go +++ b/pkg/core/blockchain_core_test.go @@ -155,16 +155,19 @@ func checkNewBlockchainErr(t *testing.T, cfg func(c *config.Config), store stora } func TestNewBlockchainIncosistencies(t *testing.T) { - t.Run("untraceable blocks/headers", func(t *testing.T) { - checkNewBlockchainErr(t, func(c *config.Config) { - c.ApplicationConfiguration.RemoveUntraceableHeaders = true - }, storage.NewMemoryStore(), "RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not") - }) t.Run("state exchange without state root", func(t *testing.T) { checkNewBlockchainErr(t, func(c *config.Config) { c.ProtocolConfiguration.P2PStateExchangeExtensions = true }, storage.NewMemoryStore(), "P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off") }) + t.Run("trusted header without state sync extensions", func(t *testing.T) { + checkNewBlockchainErr(t, func(c *config.Config) { + c.ApplicationConfiguration.TrustedHeader = config.HashIndex{ + Hash: util.Uint256{1, 2, 3}, + Index: 1, + } + }, storage.NewMemoryStore(), "TrustedHeader can not be used without P2PStateExchangeExtensions or NeoFSStateSyncExtensions") + }) } func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { @@ -255,7 +258,7 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) { if stage == 0x03 { errText = "unknown state jump stage" } - checkNewBlockchainErr(t, spountCfg, bcSpout.dao.Store, errText) + checkNewBlockchainErr(t, spountCfg, bcSpout.dao.GetPrivate().Store, errText) // don't persist changes since bcSpout.dao.Store will be reused by subsequent tests. }) } } diff --git a/pkg/core/blockchain_neotest_test.go b/pkg/core/blockchain_neotest_test.go index f152ef099b..8076cc13d2 100644 --- a/pkg/core/blockchain_neotest_test.go +++ b/pkg/core/blockchain_neotest_test.go @@ -281,6 +281,183 @@ func TestBlockchain_StartFromExistingDB(t *testing.T) { }) } +func TestBlockchain_InitHeaderHashes(t *testing.T) { + var headerBatchCount = 2000 + + // Create source chain, fill it with some blocks. + bcSpout, validators := chain.NewSingle(t) + e := neotest.NewExecutor(t, bcSpout, validators, validators) + e.GenerateNewBlocks(t, 3*headerBatchCount) + + check := func(t *testing.T, headerHeight int, trusted *block.Header) { + // Create secondary node, add some headers and try to start from existing DB. + ps, path := newLevelDBForTestingWithPath(t, "") + bc, _ := chain.NewSingleWithCustomConfigAndStore(t, func(cfg *config.Blockchain) { + cfg.RemoveUntraceableBlocks = true // required for NeoFSStateSyncExtensions. + }, ps, false) + go bc.Run() + + // Add some headers to the DB and close chain. + for i := 1; i <= headerHeight; i++ { + h, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(i))) + require.NoError(t, err) + require.NoError(t, bc.AddHeaders(h)) + } + require.Equal(t, uint32(headerHeight), bc.HeaderHeight()) + bc.Close() + + // Open the chain one more time and start with some trusted header. + ps, _ = newLevelDBForTestingWithPath(t, path) + bc, _ = chain.NewSingleWithCustomConfigAndStore(t, func(cfg *config.Blockchain) { + cfg.RemoveUntraceableBlocks = true + cfg.NeoFSStateSyncExtensions = true + cfg.NeoFSBlockFetcher.Enabled = true + cfg.NeoFSStateFetcher.Enabled = true + cfg.TrustedHeader = config.HashIndex{ + Hash: trusted.Hash(), + Index: trusted.Index, + } + }, ps, false) + go bc.Run() + + // Check that header height and headers are in the proper state. + var expectedHeight uint32 + switch { + case uint32(headerHeight) < trusted.Index: + expectedHeight = trusted.Index - 1 // one block below trusted header since trusted header should be fetched from the network. + case uint32(headerHeight) == trusted.Index: + expectedHeight = trusted.Index // trusted header is already fetched. + case uint32(headerHeight) > trusted.Index: + expectedHeight = uint32(headerHeight) // headers chain is above trusted header. + } + require.Equal(t, int(expectedHeight), int(bc.HeaderHeight())) + + for i := range bc.HeaderHeight() { + actual := bc.GetHeaderHash(i) + switch { + case i < trusted.Index: + // Different setups assume different behaviour, and it's not that + // important what do we have below trusted height. + case i == trusted.Index: + require.Equal(t, trusted.Hash(), actual) + actualH, err := bc.GetHeader(actual) + require.NoError(t, err) + expectedH, err := bcSpout.GetHeader(actual) + require.NoError(t, err) + require.Equal(t, expectedH, actualH) + case i > trusted.Index: + require.Equal(t, bcSpout.GetHeaderHash(i), actual, i) + actualH, err := bc.GetHeader(actual) + require.NoError(t, err) + expectedH, err := bcSpout.GetHeader(actual) + require.NoError(t, err) + require.Equal(t, expectedH, actualH) + } + } + bc.Close() + } + + t.Run("headerHeight < trustedHeight", func(t *testing.T) { + t.Run("start from genesis", func(t *testing.T) { + t.Run("batch 0", func(t *testing.T) { + t.Run("start", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(0)) + require.NoError(t, err) + check(t, 0, trusted) + }) + t.Run("middle", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(100)) + require.NoError(t, err) + check(t, 0, trusted) + }) + t.Run("end", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) - 1)) + require.NoError(t, err) + check(t, 0, trusted) + }) + }) + t.Run("batch 1", func(t *testing.T) { + t.Run("start", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount))) + require.NoError(t, err) + check(t, 0, trusted) + }) + t.Run("middle", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) + 1)) + require.NoError(t, err) + check(t, 0, trusted) + }) + t.Run("end", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(2*headerBatchCount) - 1)) + require.NoError(t, err) + check(t, 0, trusted) + }) + }) + t.Run("batch 2", func(t *testing.T) { + t.Run("start", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(2 * headerBatchCount))) + require.NoError(t, err) + check(t, 0, trusted) + }) + t.Run("middle", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(2*headerBatchCount) + 1)) + require.NoError(t, err) + check(t, 0, trusted) + }) + t.Run("end", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(3*headerBatchCount) - 1)) + require.NoError(t, err) + check(t, 0, trusted) + }) + }) + }) + t.Run("batch 0", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(8)) + require.NoError(t, err) + check(t, 3, trusted) + }) + t.Run("batch 0 + batch 1", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) + 10)) + require.NoError(t, err) + check(t, headerBatchCount-20, trusted) + }) + t.Run("batch 1", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) + 10)) + require.NoError(t, err) + check(t, headerBatchCount+5, trusted) + }) + }) + t.Run("headerHeight == trustedHeight", func(t *testing.T) { + t.Run("incomplete batch", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(8)) + require.NoError(t, err) + check(t, 8, trusted) + }) + t.Run("complete batch", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) - 1)) + require.NoError(t, err) + check(t, headerBatchCount-1, trusted) + }) + }) + t.Run("headerHeight > trustedHeight", func(t *testing.T) { + t.Run("batch 0", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(8)) + require.NoError(t, err) + check(t, 10, trusted) + }) + t.Run("batch 0 + batch 1", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) - 10)) + require.NoError(t, err) + check(t, headerBatchCount+20, trusted) + }) + t.Run("batch 1", func(t *testing.T) { + trusted, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(uint32(headerBatchCount) + 10)) + require.NoError(t, err) + check(t, headerBatchCount+20, trusted) + }) + }) +} + // TestBlockchain_InitializeNeoCache_Bug3181 is aimed to reproduce and check situation // when panic occures on native Neo cache initialization due to access to native Policy // cache when it's not yet initialized to recalculate candidates. @@ -618,6 +795,20 @@ func TestBlockchain_AddBadBlock(t *testing.T) { c.VerifyTransactions = true c.SkipBlockVerification = false }) + + // Add proper header, then try to add malicious block that doesn't match this header. + tx = e.NewUnsignedTx(t, neoHash, "transfer", acc.ScriptHash(), util.Uint160{1, 2, 3}, 1, nil) + e.SignTx(t, tx, -1, acc) + b = e.NewUnsignedBlock(t, tx) + e.SignBlock(b) + bc1, _ := chain.NewSingleWithCustomConfig(t, func(c *config.Blockchain) { + c.VerifyTransactions = true + c.SkipBlockVerification = false + }) + require.NoError(t, bc1.AddHeaders(&b.Header)) + b = e.NewUnsignedBlock(t, tx) // construct malicious block which is not even signed. + b.Timestamp++ + require.ErrorContains(t, bc1.AddBlock(b), "invalid block: hash mismatch") } func TestBlockchain_GetHeader(t *testing.T) { @@ -1043,6 +1234,11 @@ func TestBlockchain_Subscriptions(t *testing.T) { } func TestBlockchain_RemoveUntraceable(t *testing.T) { + const ( + headerBatchCount = 2000 // nested from HeaderHashes. + mtb = 3 // use small value to fit within the size of blockTimesCache in GC mode with short GCP. + gcp = 1 // use small value to check precisely removal of untraceable blocks. + ) neoCommitteeKey := []byte{0xfb, 0xff, 0xff, 0xff, 0x0e} check := func(t *testing.T, bc *core.Blockchain, tHash, bHash, sHash util.Uint256, errorExpected bool) { _, _, err := bc.GetTransaction(tHash) @@ -1064,7 +1260,11 @@ func TestBlockchain_RemoveUntraceable(t *testing.T) { require.NoError(t, err) } _, err = bc.GetHeader(bHash) - require.NoError(t, err) + if errorExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } if !sHash.Equals(util.Uint256{}) { sm := bc.GetStateModule() _, err = sm.GetState(sHash, neoCommitteeKey) @@ -1077,47 +1277,74 @@ func TestBlockchain_RemoveUntraceable(t *testing.T) { } t.Run("P2PStateExchangeExtensions off", func(t *testing.T) { bc, acc := chain.NewSingleWithCustomConfig(t, func(c *config.Blockchain) { - c.MaxTraceableBlocks = 2 - c.Ledger.GarbageCollectionPeriod = 2 + c.MaxTraceableBlocks = mtb + c.Ledger.GarbageCollectionPeriod = gcp c.Ledger.RemoveUntraceableBlocks = true }) e := neotest.NewExecutor(t, bc, acc, acc) neoValidatorInvoker := e.ValidatorInvoker(e.NativeHash(t, nativenames.Neo)) + // Fill in the first batch of header hashes up to 2000 so that they are persisted to disk + // to enable untraceable blocks removal. + e.GenerateNewBlocks(t, headerBatchCount-mtb) tx1Hash := neoValidatorInvoker.Invoke(t, true, "transfer", acc.ScriptHash(), util.Uint160{1, 2, 3}, 1, nil) tx1Height := bc.BlockHeight() b1 := e.TopBlock(t) sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height) require.NoError(t, err) - - neoValidatorInvoker.Invoke(t, true, "transfer", acc.ScriptHash(), util.Uint160{1, 2, 3}, 1, nil) + for e.Chain.BlockHeight() < tx1Height+mtb { + e.AddNewBlock(t) + } _, h1, err := bc.GetTransaction(tx1Hash) require.NoError(t, err) require.Equal(t, tx1Height, h1) check(t, bc, tx1Hash, b1.Hash(), sRoot.Root, false) - e.GenerateNewBlocks(t, 4) - sm := bc.GetStateModule() + // Remember some hashes for further check. + hashesTail := make([]util.Uint256, 0, bc.BlockHeight()) + for i := range bc.BlockHeight() { + hashesTail = append(hashesTail, bc.GetHeaderHash(i)) + } + + // Add one more block to make tx1 untraceable. + e.AddNewBlock(t) + require.Eventually(t, func() bool { - _, err = sm.GetState(sRoot.Root, neoCommitteeKey) + // Use block removal as a reliable trigger since it's the last thing that GC does. + _, err = bc.GetBlock(b1.Hash()) return err != nil }, 2*bcPersistInterval, 10*time.Millisecond) check(t, bc, tx1Hash, b1.Hash(), sRoot.Root, true) + + // Check that the full tail of untraceable blocks is removed. + for i, h := range hashesTail[:bc.BlockHeight()-mtb] { + _, err = bc.GetBlock(h) + require.Error(t, err, i) + } + + // Check that MTB-th block is reachable. + _, err = bc.GetBlock(hashesTail[bc.BlockHeight()-mtb]) + require.NoError(t, err) }) t.Run("P2PStateExchangeExtensions on", func(t *testing.T) { + const stateSyncInterval = 2 // use small value to fit within the size of blockTimesCache in GC mode with short GCP. bc, acc := chain.NewSingleWithCustomConfig(t, func(c *config.Blockchain) { - c.MaxTraceableBlocks = 2 - c.Ledger.GarbageCollectionPeriod = 2 + c.MaxTraceableBlocks = mtb + c.Ledger.GarbageCollectionPeriod = gcp c.Ledger.RemoveUntraceableBlocks = true c.P2PStateExchangeExtensions = true - c.StateSyncInterval = 2 + c.StateSyncInterval = stateSyncInterval c.StateRootInHeader = true }) e := neotest.NewExecutor(t, bc, acc, acc) neoValidatorInvoker := e.ValidatorInvoker(e.NativeHash(t, nativenames.Neo)) + // Fill in the first batch of header hashes up to 2000 so that they are persisted to disk + // to enable untraceable blocks removal. + e.GenerateNewBlocks(t, headerBatchCount-headerBatchCount%stateSyncInterval-stateSyncInterval-mtb) + tx1Hash := neoValidatorInvoker.Invoke(t, true, "transfer", acc.ScriptHash(), util.Uint160{1, 2, 3}, 1, nil) tx1Height := bc.BlockHeight() b1 := e.TopBlock(t) @@ -1132,18 +1359,42 @@ func TestBlockchain_RemoveUntraceable(t *testing.T) { require.NoError(t, err) require.Equal(t, tx1Height, h1) - e.GenerateNewBlocks(t, 3) + for e.Chain.BlockHeight() < tx1Height+2*stateSyncInterval+mtb-1 { + e.AddNewBlock(t) + } check(t, bc, tx1Hash, b1.Hash(), sRoot.Root, false) check(t, bc, tx2Hash, b2.Hash(), sRoot.Root, false) + // Remember some hashes for further check. + hashesTail := make([]util.Uint256, 0, bc.BlockHeight()) + for i := range bc.BlockHeight() { + hashesTail = append(hashesTail, bc.GetHeaderHash(i)) + } + e.AddNewBlock(t) + require.Eventually(t, func() bool { + // Use block removal as a reliable trigger since it's the last thing that GC does. + _, err = bc.GetBlock(b1.Hash()) + return err != nil + }, 2*bcPersistInterval, 10*time.Millisecond) + check(t, bc, tx1Hash, b1.Hash(), util.Uint256{}, true) check(t, bc, tx2Hash, b2.Hash(), util.Uint256{}, false) _, h2, err := bc.GetTransaction(tx2Hash) require.NoError(t, err) require.Equal(t, tx2Height, h2) + + // Check that the full tail of untraceable blocks is removed. + for i, h := range hashesTail[:bc.BlockHeight()-mtb-2*stateSyncInterval+1] { + _, err = bc.GetBlock(h) + require.Error(t, err, i) + } + + // Check that MTB-th block is reachable. + _, err = bc.GetBlock(hashesTail[bc.BlockHeight()-mtb-2*stateSyncInterval+1]) + require.NoError(t, err) }) } diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 6c2b43acd8..1e0fd9770a 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -642,23 +642,17 @@ func (dao *Simple) GetHeaderHashes(height uint32) ([]util.Uint256, error) { return hashes, nil } -// DeleteHeaderHashes removes batches of header hashes starting from the one that +// DeleteHeaderHashesHead removes batches of header hashes starting from the one that // contains header with index `since` up to the most recent batch. It assumes that // all stored batches contain `batchSize` hashes. -func (dao *Simple) DeleteHeaderHashes(since uint32, batchSize int) { +func (dao *Simple) DeleteHeaderHashesHead(since uint32, batchSize int) { + first := dao.mkHeaderHashKey(since / uint32(batchSize) * uint32(batchSize)) dao.Store.Seek(storage.SeekRange{ - Prefix: dao.mkKeyPrefix(storage.IXHeaderHashList), - Backwards: true, + Prefix: first[0:1], + Start: first[1:], }, func(k, _ []byte) bool { - first := binary.BigEndian.Uint32(k[1:]) - if first >= since { - dao.Store.Delete(k) - return first != since - } - if first+uint32(batchSize)-1 >= since { - dao.Store.Delete(k) - } - return false + dao.Store.Delete(k) + return true }) } @@ -830,21 +824,14 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a // DeleteBlock removes the block from dao. It's not atomic, so make sure you're // using private MemCached instance here. It returns block timestamp for GC // convenience. -func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) (uint64, error) { +func (dao *Simple) DeleteBlock(h util.Uint256) (uint64, error) { key := dao.makeExecutableKey(h) b, err := dao.getBlock(key) if err != nil { return 0, err } - if !dropHeader { - err = dao.storeHeader(key, &b.Header) - if err != nil { - return 0, err - } - } else { - dao.Store.Delete(key) - } + dao.Store.Delete(key) for _, tx := range b.Transactions { copy(key[1:], tx.Hash().BytesBE()) diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index ef6cd6fa59..02d578ae7d 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -109,14 +109,7 @@ func TestPutGetBlock(t *testing.T) { require.Equal(t, *appExecResult1, gotAppExecResult[0]) require.Equal(t, *appExecResult2, gotAppExecResult[1]) - ts, err := dao.DeleteBlock(hash, false) - require.NoError(t, err) - require.Equal(t, uint64(42), ts) - gotBlock, err = dao.GetBlock(hash) // It's just a header, but it's still there. - require.NoError(t, err) - require.NotNil(t, gotBlock) - - ts, err = dao.DeleteBlock(hash, true) + ts, err := dao.DeleteBlock(hash) require.NoError(t, err) require.Equal(t, uint64(42), ts) _, err = dao.GetBlock(hash) diff --git a/pkg/core/headerhashes.go b/pkg/core/headerhashes.go index fc69bb8f35..7d16ea5065 100644 --- a/pkg/core/headerhashes.go +++ b/pkg/core/headerhashes.go @@ -6,6 +6,7 @@ import ( "sync" lru "github.com/hashicorp/golang-lru/v2" + "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/dao" "github.com/nspcc-dev/neo-go/pkg/util" @@ -38,29 +39,53 @@ type HeaderHashes struct { cache *lru.Cache[uint32, []util.Uint256] } -func (h *HeaderHashes) initGenesis(dao *dao.Simple, hash util.Uint256) { +func (h *HeaderHashes) initMinTrustedHeader(dao *dao.Simple, trusted config.HashIndex) { h.dao = dao h.cache, _ = lru.New[uint32, []util.Uint256](pagesCache) // Never errors for positive size. h.previous = make([]util.Uint256, headerBatchCount) h.latest = make([]util.Uint256, 0, headerBatchCount) - h.latest = append(h.latest, hash) - dao.PutCurrentHeader(hash, 0) -} -func (h *HeaderHashes) init(dao *dao.Simple) error { - h.dao = dao - h.cache, _ = lru.New[uint32, []util.Uint256](pagesCache) // Never errors for positive size. + // For non-genesis block, trusted header is not yet in the storage. Use a + // stub and pretend that the latest header is `trusted.Index-1` to fetch + // the next one (trusted) from the network. + if trusted.Index > 0 { + trusted.Index-- + trusted.Hash = util.Uint256{} + } + + for range trusted.Index % headerBatchCount { + h.latest = append(h.latest, util.Uint256{}) + } + h.latest = append(h.latest, trusted.Hash) + dao.PutCurrentHeader(trusted.Hash, trusted.Index) + h.storedHeaderCount = (trusted.Index /*trusted header is not yet in the storage*/ / headerBatchCount) * headerBatchCount + + // Store trusted header if it's the last header in the batch and update storedHeaderCount. + _ = h.tryStoreBatch(dao) // ignore serialization error. - currHeaderHeight, currHeaderHash, err := h.dao.GetCurrentHeaderHeight() + updateHeaderHeightMetric(trusted.Index) +} + +func (h *HeaderHashes) init(dao *dao.Simple, trusted config.HashIndex) error { + currHeaderHeight, currHeaderHash, err := dao.GetCurrentHeaderHeight() if err != nil { return fmt.Errorf("failed to retrieve current header info: %w", err) } - h.storedHeaderCount = ((currHeaderHeight + 1) / headerBatchCount) * headerBatchCount + if currHeaderHeight < trusted.Index { + h.initMinTrustedHeader(dao, trusted) + return nil + } - if h.storedHeaderCount >= headerBatchCount { + h.dao = dao + h.cache, _ = lru.New[uint32, []util.Uint256](pagesCache) // Never errors for positive size. + h.storedHeaderCount = ((currHeaderHeight + 1) / headerBatchCount) * headerBatchCount + missingHeaderCount := ((trusted.Index + 1) / headerBatchCount) * headerBatchCount + if h.storedHeaderCount >= headerBatchCount && + ((h.storedHeaderCount > missingHeaderCount && h.storedHeaderCount-missingHeaderCount >= headerBatchCount) || + currHeaderHeight%headerBatchCount != trusted.Index%headerBatchCount) { h.previous, err = h.dao.GetHeaderHashes(h.storedHeaderCount - headerBatchCount) if err != nil { - return fmt.Errorf("failed to retrieve header hash page %d: %w", h.storedHeaderCount-headerBatchCount, err) + return fmt.Errorf("failed to retrieve header hash page %d: %w; stored: %d, missing: %d, trusted: %d, curr: %d", h.storedHeaderCount-headerBatchCount, err, h.storedHeaderCount, missingHeaderCount, trusted.Index, currHeaderHeight) } } else { h.previous = make([]util.Uint256, headerBatchCount) @@ -71,22 +96,37 @@ func (h *HeaderHashes) init(dao *dao.Simple) error { // batch of 2000 headers was stored. Via the currentHeaders stored we can sync // that with stored blocks. if currHeaderHeight >= h.storedHeaderCount { - hash := currHeaderHash - var targetHash util.Uint256 + var ( + hash = currHeaderHash + targetHash util.Uint256 + padLeft bool + ) if h.storedHeaderCount >= headerBatchCount { targetHash = h.previous[len(h.previous)-1] } + if targetHash.Equals(util.Uint256{}) && trusted.Index > 0 { + // Don't retrieve header hashes prior to trusted header (if set) since sometimes + // these blocks may be missing from the storage (if previously node was started + // from existing DB with some trusted point higher than header height). These hashes + // are useless for further node operation anyway. + targetHash = trusted.Hash + padLeft = true + } headers := make([]util.Uint256, 0, headerBatchCount) for hash != targetHash { blk, err := h.dao.GetBlock(hash) if err != nil { - return fmt.Errorf("could not get header %s: %w", hash, err) + return fmt.Errorf("could not get header %s: %w", hash.StringLE(), err) } headers = append(headers, blk.Hash()) hash = blk.PrevHash } slices.Reverse(headers) + if padLeft { + h.latest = h.latest[:currHeaderHeight-uint32(len(headers))] + h.latest = append(h.latest, trusted.Hash) + } h.latest = append(h.latest, headers...) } return nil @@ -124,14 +164,9 @@ func (h *HeaderHashes) addHeaders(headers ...*block.Header) error { } lastHeader = head h.latest = append(h.latest, head.Hash()) - if len(h.latest) == headerBatchCount { - err = batch.StoreHeaderHashes(h.latest, h.storedHeaderCount) - if err != nil { - return err - } - copy(h.previous, h.latest) - h.latest = h.latest[:0] - h.storedHeaderCount += headerBatchCount + err = h.tryStoreBatch(batch) + if err != nil { + return fmt.Errorf("failed to store batch of header hashes: %w", err) } } if lastHeader != nil { @@ -144,6 +179,22 @@ func (h *HeaderHashes) addHeaders(headers ...*block.Header) error { return nil } +// tryStoreBatch stores the current batch of header hashes to the storage in +// case if batch is full. It also initializes HeaderHashes for the next batch +// processing. It does not persist dao. +func (h *HeaderHashes) tryStoreBatch(d *dao.Simple) error { + if len(h.latest) == headerBatchCount { + err := d.StoreHeaderHashes(h.latest, h.storedHeaderCount) + if err != nil { + return err + } + copy(h.previous, h.latest) + h.latest = h.latest[:0] + h.storedHeaderCount += headerBatchCount + } + return nil +} + // CurrentHeaderHash returns the hash of the latest known header. func (h *HeaderHashes) CurrentHeaderHash() util.Uint256 { var hash util.Uint256 diff --git a/pkg/core/mpt/trie_store.go b/pkg/core/mpt/trie_store.go index e98859e062..fd8c07d83a 100644 --- a/pkg/core/mpt/trie_store.go +++ b/pkg/core/mpt/trie_store.go @@ -114,7 +114,7 @@ func (m *TrieStore) Seek(rng storage.SeekRange, f func(k, v []byte) bool) { // SeekGC implements the Store interface. It's not supported, so it always // returns [errors.ErrUnsupported]. -func (m *TrieStore) SeekGC(rng storage.SeekRange, keep func(k, v []byte) bool) error { +func (m *TrieStore) SeekGC(rng storage.SeekRange, keep func(k, v []byte) (bool, bool)) error { return fmt.Errorf("%w: SeekGC is not supported", errors.ErrUnsupported) } diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index e06c8a3287..81ee2b56e1 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -308,17 +308,17 @@ func (s *Module) GC(index uint32, store storage.Store) time.Duration { start := time.Now() err := store.SeekGC(storage.SeekRange{ Prefix: []byte{byte(storage.DataMPT)}, - }, func(k, v []byte) bool { + }, func(k, v []byte) (bool, bool) { stored++ if !mpt.IsActiveValue(v) { h := binary.LittleEndian.Uint32(v[len(v)-4:]) if h <= index { removed++ stored-- - return false + return false, true } } - return true + return true, true }) dur := time.Since(start) if err != nil { diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 014fad808d..c0658e5b6e 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -177,6 +177,12 @@ func (s *Module) Init(currChainHeight uint32) error { return nil } if s.bc.BlockHeight() > p-2*s.syncInterval { + trustedH := s.bc.GetConfig().TrustedHeader.Index + if trustedH > s.bc.BlockHeight() { + return fmt.Errorf("misconfigured trusted header height: chain is alread in sync (block height is %d, lower bound of latest sync interval is %d), but trusted height %d is upper than block height; reset trusted header to proper height", + s.bc.BlockHeight(), p-2*s.syncInterval, trustedH) + } + // chain has already been synchronised up to old state sync point and regular blocks processing was started. // Current block height is enough to start regular blocks processing. s.syncStage = inactive @@ -446,9 +452,13 @@ func (s *Module) AddBlock(block *block.Block) error { if !s.bc.GetConfig().SkipBlockVerification { merkle := block.ComputeMerkleRoot() if !block.MerkleRoot.Equals(merkle) { - return errors.New("invalid block: MerkleRoot mismatch") + return fmt.Errorf("invalid block: MerkleRoot mismatch: expected %s, got %s", merkle.StringLE(), block.MerkleRoot.StringLE()) } } + expectedH := s.bc.GetHeaderHash(block.Index) + if !block.Hash().Equals(expectedH) { + return fmt.Errorf("invalid block: hash mismatch: expected %s, got %s", expectedH, block.Hash().StringLE()) + } cache := s.dao.GetPrivate() if err := cache.StoreAsBlock(block, nil, nil); err != nil { return err diff --git a/pkg/core/statesync/neotest_test.go b/pkg/core/statesync/neotest_test.go index f777712134..c85f82bfa4 100644 --- a/pkg/core/statesync/neotest_test.go +++ b/pkg/core/statesync/neotest_test.go @@ -361,6 +361,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { stateSyncInterval = 4 maxTraceable = 6 stateSyncPoint = 24 + trustedHeader = stateSyncPoint - 2*maxTraceable + 2 ) spoutCfg := func(c *config.Blockchain) { c.Ledger.KeepOnlyLatestState = spoutEnableGC @@ -368,6 +369,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { c.StateRootInHeader = true c.StateSyncInterval = stateSyncInterval c.MaxTraceableBlocks = maxTraceable + c.P2PStateExchangeExtensions = true // a tiny hack to avoid removal of untraceable headers from spout chain. c.Hardforks = map[string]uint32{ config.HFAspidochelone.String(): 0, config.HFBasilisk.String(): 0, @@ -385,7 +387,8 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { e := neotest.NewExecutor(t, bcSpout, validators, committee) basicchain.Init(t, "../../../", e) - // make spout chain higher than latest state sync point (add several blocks up to stateSyncPoint+2) + // make spout chain higher than latest state sync point (add several blocks up to stateSyncPoint+2), + // consider keeping in sync with trustedHeader. e.AddNewBlock(t) e.AddNewBlock(t) // This block is stateSyncPoint-th block. e.AddNewBlock(t) @@ -393,14 +396,23 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { boltCfg := func(c *config.Blockchain) { spoutCfg(c) + c.P2PStateExchangeExtensions = true c.Ledger.KeepOnlyLatestState = true c.Ledger.RemoveUntraceableBlocks = true - if enableStorageSync { + c.P2PStateExchangeExtensions = false c.NeoFSStateSyncExtensions = true c.NeoFSStateFetcher.Enabled = true c.NeoFSBlockFetcher.Enabled = true } + if spoutEnableGC { + // Use trusted header because spout chain doesn't have full header hashes chain + // (they are removed along with old blocks/headers). + c.TrustedHeader = config.HashIndex{ + Hash: bcSpout.GetHeaderHash(trustedHeader), + Index: trustedHeader, + } + } } bcBoltStore := storage.NewMemoryStore() bcBolt, _, _ := chain.NewMultiWithCustomConfigAndStore(t, boltCfg, bcBoltStore, false) @@ -408,9 +420,9 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { module := bcBolt.GetStateSyncModule() t.Run("error: add headers before initialisation", func(t *testing.T) { - h, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(1)) + h, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(bcSpout.HeaderHeight() - maxTraceable + 1)) require.NoError(t, err) - require.Error(t, module.AddHeaders(h)) + require.ErrorContains(t, module.AddHeaders(h), "headers were not requested") }) t.Run("no error: add blocks before initialisation", func(t *testing.T) { b, err := bcSpout.GetBlock(bcSpout.GetHeaderHash(bcSpout.BlockHeight())) @@ -439,7 +451,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { }) }) t.Run("error: add MPT nodes without initialisation", func(t *testing.T) { - require.Error(t, module.AddMPTNodes([][]byte{})) + require.ErrorContains(t, module.AddMPTNodes([][]byte{}), "MPT nodes were not requested") }) } @@ -450,11 +462,15 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) { require.False(t, module.NeedStorageData()) require.Panics(t, func() { module.BlockHeight() }) - // add headers to module + // add headers to module starting from trusted height headers := make([]*block.Header, 0, bcSpout.HeaderHeight()) - for i := uint32(1); i <= bcSpout.HeaderHeight(); i++ { + start := 1 + if spoutEnableGC { + start = trustedHeader + } + for i := uint32(start); i <= bcSpout.HeaderHeight(); i++ { h, err := bcSpout.GetHeader(bcSpout.GetHeaderHash(i)) - require.NoError(t, err) + require.NoError(t, err, i) headers = append(headers, h) } require.NoError(t, module.AddHeaders(headers...)) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 3495f6d24d..9a0aacdc42 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -110,14 +110,15 @@ func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, stores map[string][]b } // SeekGC implements the Store interface. -func (s *BoltDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { +func (s *BoltDBStore) SeekGC(rng SeekRange, keepCont func(k, v []byte) (bool, bool)) error { return boltSeek(s.db.Update, rng, func(c *bbolt.Cursor, k, v []byte) (bool, error) { - if !keep(k, v) { + keep, cont := keepCont(k, v) + if !keep { if err := c.Delete(); err != nil { return false, err } } - return true, nil + return cont, nil }) } diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index be2537c6e0..4e1c03adaf 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -76,20 +76,21 @@ func (s *LevelDBStore) Seek(rng SeekRange, f func(k, v []byte) bool) { } // SeekGC implements the Store interface. -func (s *LevelDBStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { +func (s *LevelDBStore) SeekGC(rng SeekRange, keepCont func(k, v []byte) (bool, bool)) error { tx, err := s.db.OpenTransaction() if err != nil { return err } iter := tx.NewIterator(seekRangeToPrefixes(rng), nil) s.seek(iter, rng.Backwards, func(k, v []byte) bool { - if !keep(k, v) { + keep, cont := keepCont(k, v) + if !keep { err = tx.Delete(k, nil) if err != nil { return false } } - return true + return cont }) if err != nil { return err diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index 399a269df2..1db8556c40 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -153,9 +153,9 @@ func (s *MemCachedStore) PutChangeSet(puts map[string][]byte, stores map[string] } // Seek implements the Store interface. -func (s *MemCachedStore) Seek(rng SeekRange, f func(k, v []byte) bool) { +func (s *MemCachedStore) Seek(rng SeekRange, cont func(k, v []byte) bool) { ps, memRes := s.prepareSeekMemSnapshot(rng) - performSeek(context.Background(), ps, memRes, rng, false, f) + performSeek(context.Background(), ps, memRes, rng, false, cont) } // GetStorageChanges returns all current storage changes. It can only be done for private @@ -231,7 +231,7 @@ func (s *MemCachedStore) prepareSeekMemSnapshot(rng SeekRange) (Store, []KeyValu // `cutPrefix` denotes whether provided key needs to be cut off the resulting keys. // `rng` specifies prefix items must match and point to start seeking from. Backwards // seeking from some point is supported with corresponding `rng` field set. -func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng SeekRange, cutPrefix bool, f func(k, v []byte) bool) { +func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng SeekRange, cutPrefix bool, cont func(k, v []byte) bool) { lPrefix := len(string(rng.Prefix)) var cmpFunc = getCmpFunc(rng.Backwards) @@ -273,7 +273,7 @@ func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng See if cutPrefix { kvMem.Key = kvMem.Key[lPrefix:] } - if !f(kvMem.Key, kvMem.Value) { + if !cont(kvMem.Key, kvMem.Value) { done = true return false } @@ -290,7 +290,7 @@ func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng See if cutPrefix { kvPs.Key = kvPs.Key[lPrefix:] } - if !f(kvPs.Key, kvPs.Value) { + if !cont(kvPs.Key, kvPs.Value) { done = true return false } @@ -319,7 +319,7 @@ func performSeek(ctx context.Context, ps Store, memRes []KeyValueExists, rng See if cutPrefix { kvMem.Key = kvMem.Key[lPrefix:] } - if !f(kvMem.Key, kvMem.Value) { + if !cont(kvMem.Key, kvMem.Value) { break loop } } diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index cc041b4214..e6299f83ae 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -318,7 +318,7 @@ func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string][]byte) error } func (b *BadStore) Seek(rng SeekRange, f func(k, v []byte) bool) { } -func (b *BadStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { +func (b *BadStore) SeekGC(rng SeekRange, keepCont func(k, v []byte) (bool, bool)) error { return nil } func (b *BadStore) Close() error { diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index dff82f8b3d..f96d32c9b8 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -77,7 +77,7 @@ func (s *MemoryStore) Seek(rng SeekRange, f func(k, v []byte) bool) { } // SeekGC implements the Store interface. -func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { +func (s *MemoryStore) SeekGC(rng SeekRange, keepCont func(k, v []byte) (bool, bool)) error { noop := func() {} // Keep RW lock for the whole Seek time, state must be consistent across whole // operation and we call delete in the handler. @@ -85,10 +85,11 @@ func (s *MemoryStore) SeekGC(rng SeekRange, keep func(k, v []byte) bool) error { // We still need to perform normal seek, some GC operations can be // sensitive to the order of KV pairs. s.seek(rng, func(k, v []byte) bool { - if !keep(k, v) { + keep, cont := keepCont(k, v) + if !keep { delete(s.chooseMap(k), string(k)) } - return true + return cont }, noop, noop) s.mut.Unlock() return nil diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index f88f965bb3..68c253f470 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -94,10 +94,11 @@ type ( // Seek can guarantee that key-value items are sorted by key in ascending way. Seek(rng SeekRange, f func(k, v []byte) bool) // SeekGC is similar to Seek, but the function should return true if current - // KV pair should be kept and false if it's to be deleted; there is no way to - // do an early exit here. SeekGC only works with the current Store, it won't - // go down to layers below and it takes a full write lock, so use it carefully. - SeekGC(rng SeekRange, keep func(k, v []byte) bool) error + // KV pair should be kept and false if it's to be deleted; the second return + // value denotes whether to continue iteration. SeekGC only works with the + // current Store, it won't go down to layers below and it takes a full write + // lock, so use it carefully. + SeekGC(rng SeekRange, keepCont func(k, v []byte) (bool, bool)) error Close() error } diff --git a/pkg/core/storage/storeandbatch_test.go b/pkg/core/storage/storeandbatch_test.go index cb670e2c51..cbf1ea7560 100644 --- a/pkg/core/storage/storeandbatch_test.go +++ b/pkg/core/storage/storeandbatch_test.go @@ -201,16 +201,16 @@ func testStoreSeek(t *testing.T, s Store) { func testStoreSeekGC(t *testing.T, s Store) { kvs := pushSeekDataSet(t, s) - err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) bool { - return true + err := s.SeekGC(SeekRange{Prefix: []byte("1")}, func(k, v []byte) (bool, bool) { + return true, true }) require.NoError(t, err) for i := range kvs { _, err = s.Get(kvs[i].Key) require.NoError(t, err) } - err = s.SeekGC(SeekRange{Prefix: []byte("3")}, func(k, v []byte) bool { - return false + err = s.SeekGC(SeekRange{Prefix: []byte("3")}, func(k, v []byte) (bool, bool) { + return false, true }) require.NoError(t, err) for i := range kvs[:5] { diff --git a/pkg/crypto/keys/publickey.go b/pkg/crypto/keys/publickey.go index 3bce10de9a..d09ad5a009 100644 --- a/pkg/crypto/keys/publickey.go +++ b/pkg/crypto/keys/publickey.go @@ -18,6 +18,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/emit" + "gopkg.in/yaml.v3" ) // coordLen is the number of bytes in serialized X or Y coordinate. @@ -388,9 +389,9 @@ func (p *PublicKey) MarshalYAML() (any, error) { } // UnmarshalYAML implements the YAML unmarshaler interface. -func (p *PublicKey) UnmarshalYAML(unmarshal func(any) error) error { +func (p *PublicKey) UnmarshalYAML(node *yaml.Node) error { var s string - err := unmarshal(&s) + err := node.Decode(&s) if err != nil { return err } diff --git a/pkg/encoding/fixedn/fixed8.go b/pkg/encoding/fixedn/fixed8.go index 46adfeb215..d8bbb70443 100644 --- a/pkg/encoding/fixedn/fixed8.go +++ b/pkg/encoding/fixedn/fixed8.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/nspcc-dev/neo-go/pkg/io" + "gopkg.in/yaml.v3" ) const ( @@ -86,9 +87,9 @@ func (f *Fixed8) UnmarshalJSON(data []byte) error { } // UnmarshalYAML implements the yaml unmarshaler interface. -func (f *Fixed8) UnmarshalYAML(unmarshal func(any) error) error { +func (f *Fixed8) UnmarshalYAML(node *yaml.Node) error { var s string - err := unmarshal(&s) + err := node.Decode(&s) if err != nil { return err } diff --git a/pkg/network/bqueue/operationmode_string.go b/pkg/network/bqueue/operationmode_string.go new file mode 100644 index 0000000000..4781185c2b --- /dev/null +++ b/pkg/network/bqueue/operationmode_string.go @@ -0,0 +1,24 @@ +// Code generated by "stringer -type=OperationMode"; DO NOT EDIT. + +package bqueue + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[NonBlocking-0] + _ = x[Blocking-1] +} + +const _OperationMode_name = "NonBlockingBlocking" + +var _OperationMode_index = [...]uint8{0, 11, 19} + +func (i OperationMode) String() string { + if i >= OperationMode(len(_OperationMode_index)-1) { + return "OperationMode(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _OperationMode_name[_OperationMode_index[i]:_OperationMode_index[i+1]] +} diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 445fee2b97..d6321063e8 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -15,6 +15,8 @@ type Queuer[Q Queueable] interface { Height() uint32 } +//go:generate stringer -type=OperationMode + // OperationMode is the mode of operation for the queue. // Could be either Blocking or NonBlocking. type OperationMode byte @@ -116,10 +118,10 @@ func (bq *Queue[Q]) Run() { // The element might already be added by the consensus. if bq.chain.Height() < b.GetIndex() { bq.log.Warn("queue: failed to add item into the blockchain", - zap.Int("mode", int(bq.mode)), - zap.String("error", err.Error()), - zap.Uint32("height", bq.chain.Height()), - zap.Uint32("nextIndex", b.GetIndex())) + zap.Uint32("index", b.GetIndex()), + zap.Uint32("chainHeight", bq.chain.Height()), + zap.Stringer("mode", bq.mode), + zap.Error(err)) } } else if bq.relayF != nil { bq.relayF(b) diff --git a/pkg/network/server.go b/pkg/network/server.go index 60b9217f76..683bbfb412 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -801,7 +801,7 @@ func (s *Server) getVersionMsg(localAddr net.Addr) (*Message, error) { }) } cfg := s.chain.GetConfig() - if !cfg.RemoveUntraceableBlocks && !cfg.RemoveUntraceableHeaders { + if !cfg.RemoveUntraceableBlocks { capabilities = append(capabilities, capability.Capability{ Type: capability.ArchivalNode, Data: &capability.Archival{}, diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index d0bdac953c..5d38dca066 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -2770,7 +2770,7 @@ func getRelayResult(err error, hash util.Uint256) (any, *neorpc.Error) { }, nil case errors.Is(err, core.ErrTxExpired): return nil, neorpc.WrapErrorWithData(neorpc.ErrExpiredTransaction, err.Error()) - case errors.Is(err, core.ErrAlreadyExists) || errors.Is(err, core.ErrInvalidBlockIndex): + case errors.Is(err, core.ErrAlreadyExists): return nil, neorpc.WrapErrorWithData(neorpc.ErrAlreadyExists, err.Error()) case errors.Is(err, core.ErrAlreadyInPool): return nil, neorpc.WrapErrorWithData(neorpc.ErrAlreadyInPool, err.Error()) diff --git a/pkg/services/rpcsrv/server_test.go b/pkg/services/rpcsrv/server_test.go index 09a16bea32..d1c4be9e55 100644 --- a/pkg/services/rpcsrv/server_test.go +++ b/pkg/services/rpcsrv/server_test.go @@ -2753,59 +2753,6 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] require.Equal(t, trigger.PostPersist, res.Executions[1].Trigger) require.Equal(t, vmstate.Halt, res.Executions[1].VMState) }) - t.Run("submitblock", func(t *testing.T) { - rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitblock", "params": ["%s"]}` - t.Run("invalid signature", func(t *testing.T) { - s := testchain.NewBlock(t, chain, 1, 0) - s.Script.VerificationScript[8] ^= 0xff - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, s)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrVerificationFailedCode) - }) - - t.Run("invalid height", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 2, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 1, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrAlreadyExistsCode) - }) - t.Run("invalid script", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, 0xDD, 10, 0, 1, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrInvalidScriptCode) - }) - t.Run("invalid ValidUntilBlock", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 0, 0, 1, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrExpiredTransactionCode) - }) - t.Run("invalid SystemFee", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 999999999999, 1, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrPolicyFailedCode) - }) - t.Run("invalid NetworkFee", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 0, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrInsufficientNetworkFeeCode) - }) - t.Run("invalid attribute", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 2, true)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrInvalidAttributeCode) - }) - t.Run("insufficient funds", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 899999999999, 1, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - checkErrGetResult(t, body, true, neorpc.ErrInsufficientFundsCode) - }) - t.Run("positive", func(t *testing.T) { - b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 1, false)) - body := doRPCCall(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) - data := checkErrGetResult(t, body, false, 0) - var res = new(result.RelayResult) - require.NoError(t, json.Unmarshal(data, res)) - require.Equal(t, b.Hash(), res.Hash) - }) - }) t.Run("getproof", func(t *testing.T) { r, err := chain.GetStateModule().GetStateRoot(3) require.NoError(t, err) @@ -3041,7 +2988,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] require.NoErrorf(t, err, "could not parse response: %s", txOut) assert.Equal(t, *block.Transactions[0], actual.Transaction) - assert.Equal(t, 24, actual.Confirmations) + assert.Equal(t, 23, actual.Confirmations) assert.Equal(t, TXHash, actual.Transaction.Hash()) }) @@ -3635,6 +3582,75 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) [] }) } +// TestRPC_SubmitBlock is a special one since every test-case corrupts chain with +// improperly constructed header making it impossible to add further blocks. +func TestRPC_SubmitBlock(t *testing.T) { + rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitblock", "params": ["%s"]}` + check := func(t *testing.T, newBlock func(chain *core.Blockchain) *block.Block, errCode int64) { + chain, _, httpSrv := initClearServerWithInMemoryChain(t) + b := newBlock(chain) + body := doRPCCallOverHTTP(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) + checkErrGetResult(t, body, true, errCode) + } + + t.Run("invalid signature", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + b := testchain.NewBlock(t, chain, 1, 0) + b.Script.VerificationScript[8] ^= 0xff + return b + }, neorpc.ErrVerificationFailedCode) + }) + t.Run("already exists", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 0, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 1, false)) + }, neorpc.ErrAlreadyExistsCode) + }) + t.Run("invalid height", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 2, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 1, false)) + }, neorpc.ErrVerificationFailedCode) + }) + t.Run("invalid script", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, 0xDD, 10, 0, 1, false)) + }, neorpc.ErrInvalidScriptCode) + }) + t.Run("invalid ValidUntilBlock", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 0, 0, 1, false)) + }, neorpc.ErrExpiredTransactionCode) + }) + t.Run("invalid SystemFee", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 999999999999, 1, false)) + }, neorpc.ErrPolicyFailedCode) + }) + t.Run("invalid NetworkFee", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 0, false)) + }, neorpc.ErrInsufficientNetworkFeeCode) + }) + t.Run("invalid attribute", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 2, true)) + }, neorpc.ErrInvalidAttributeCode) + }) + t.Run("insufficient funds", func(t *testing.T) { + check(t, func(chain *core.Blockchain) *block.Block { + return testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 899999999999, 1, false)) + }, neorpc.ErrInsufficientFundsCode) + }) + t.Run("positive", func(t *testing.T) { + chain, _, httpSrv := initClearServerWithInMemoryChain(t) + b := testchain.NewBlock(t, chain, 1, 0, newTxWithParams(t, chain, opcode.PUSH1, 10, 0, 1, false)) + body := doRPCCallOverHTTP(fmt.Sprintf(rpc, encodeBinaryToString(t, b)), httpSrv.URL, t) + data := checkErrGetResult(t, body, false, 0) + var res = new(result.RelayResult) + require.NoError(t, json.Unmarshal(data, res)) + require.Equal(t, b.Hash(), res.Hash) + }) +} + func (e *executor) getHeader(s string) *block.Header { hash, err := util.Uint256DecodeStringLE(s) if err != nil { diff --git a/pkg/smartcontract/binding/override.go b/pkg/smartcontract/binding/override.go index ed861a0267..52f95bc722 100644 --- a/pkg/smartcontract/binding/override.go +++ b/pkg/smartcontract/binding/override.go @@ -2,6 +2,8 @@ package binding import ( "strings" + + "gopkg.in/yaml.v3" ) // Override contains a package and a type to replace manifest method parameter type with. @@ -48,10 +50,10 @@ func NewOverrideFromString(s string) Override { } // UnmarshalYAML implements the YAML Unmarshaler interface. -func (o *Override) UnmarshalYAML(unmarshal func(any) error) error { +func (o *Override) UnmarshalYAML(node *yaml.Node) error { var s string - err := unmarshal(&s) + err := node.Decode(&s) if err != nil { return err } diff --git a/pkg/smartcontract/callflag/call_flags.go b/pkg/smartcontract/callflag/call_flags.go index 03eebd0aaf..e157aa6fdd 100644 --- a/pkg/smartcontract/callflag/call_flags.go +++ b/pkg/smartcontract/callflag/call_flags.go @@ -4,6 +4,8 @@ import ( "encoding/json" "errors" "strings" + + "gopkg.in/yaml.v3" ) // CallFlag represents a call flag. @@ -124,10 +126,10 @@ func (f CallFlag) MarshalYAML() (any, error) { } // UnmarshalYAML implements the YAML unmarshaler interface. -func (f *CallFlag) UnmarshalYAML(unmarshal func(any) error) error { +func (f *CallFlag) UnmarshalYAML(node *yaml.Node) error { var s string - err := unmarshal(&s) + err := node.Decode(&s) if err != nil { return err } diff --git a/pkg/smartcontract/param_type.go b/pkg/smartcontract/param_type.go index 80705ed7ea..23bf37347f 100644 --- a/pkg/smartcontract/param_type.go +++ b/pkg/smartcontract/param_type.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/vm/emit" "github.com/nspcc-dev/neo-go/pkg/vm/opcode" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" + "gopkg.in/yaml.v3" ) // ParamType represents the Type of the smart contract parameter. @@ -129,10 +130,10 @@ func (pt ParamType) MarshalYAML() (any, error) { } // UnmarshalYAML implements the YAML Unmarshaler interface. -func (pt *ParamType) UnmarshalYAML(unmarshal func(any) error) error { +func (pt *ParamType) UnmarshalYAML(node *yaml.Node) error { var name string - err := unmarshal(&name) + err := node.Decode(&name) if err != nil { return err } diff --git a/pkg/util/uint160.go b/pkg/util/uint160.go index 6c59d56c0b..49ae0a7584 100644 --- a/pkg/util/uint160.go +++ b/pkg/util/uint160.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/nspcc-dev/neo-go/pkg/io" + "gopkg.in/yaml.v3" ) // Uint160Size is the size of Uint160 in bytes. @@ -147,10 +148,10 @@ func (u Uint160) MarshalJSON() ([]byte, error) { } // UnmarshalYAML implements the YAML Unmarshaler interface. -func (u *Uint160) UnmarshalYAML(unmarshal func(any) error) error { +func (u *Uint160) UnmarshalYAML(node *yaml.Node) error { var s string - err := unmarshal(&s) + err := node.Decode(&s) if err != nil { return err } diff --git a/pkg/util/uint256.go b/pkg/util/uint256.go index b6d7cf68a0..3b391be790 100644 --- a/pkg/util/uint256.go +++ b/pkg/util/uint256.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/nspcc-dev/neo-go/pkg/io" + "gopkg.in/yaml.v3" ) // Uint256Size is the size of Uint256 in bytes. @@ -121,6 +122,25 @@ func (u Uint256) MarshalJSON() ([]byte, error) { return r, nil } +// UnmarshalYAML implements the YAML Unmarshaler interface. +func (u *Uint256) UnmarshalYAML(node *yaml.Node) error { + var s string + + err := node.Decode(&s) + if err != nil { + return err + } + + s = strings.TrimPrefix(s, "0x") + *u, err = Uint256DecodeStringLE(s) + return err +} + +// MarshalYAML implements the YAML marshaller interface. +func (u Uint256) MarshalYAML() (any, error) { + return "0x" + u.StringLE(), nil +} + // Compare performs three-way comparison of two Uint256. Possible output: 1, -1, 0 // // 1 implies u > other. diff --git a/pkg/util/uint256_test.go b/pkg/util/uint256_test.go index 9fd1847393..7d5454a30b 100644 --- a/pkg/util/uint256_test.go +++ b/pkg/util/uint256_test.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" ) func TestUint256UnmarshalJSON(t *testing.T) { @@ -27,6 +28,29 @@ func TestUint256UnmarshalJSON(t *testing.T) { assert.Error(t, u2.UnmarshalJSON([]byte("123"))) } +func TestUint256UnmarshalYAML(t *testing.T) { + str := "f037308fa0ab18155bccfc08485468c112409ea5064595699e98c545f245f32d" + expected, err := util.Uint256DecodeStringLE(str) + require.NoError(t, err) + + // UnmarshalYAML decodes hex-strings. + var u1, u2 util.Uint256 + require.NoError(t, yaml.Unmarshal([]byte(str), &u1)) + require.Equal(t, expected, u1) + + testserdes.MarshalUnmarshalYAML(t, &expected, &u2) + + // Check 0x-prefixed marshalling. + require.NoError(t, yaml.Unmarshal([]byte("0x"+str), &u1)) + require.Equal(t, expected, u1) + actual, err := yaml.Marshal(expected) + require.NoError(t, err) + require.Equal(t, "0x"+str+"\n", string(actual)) + + // Invalid input. + assert.Error(t, u2.UnmarshalJSON([]byte("123"))) +} + func TestUint256DecodeString(t *testing.T) { hexStr := "f037308fa0ab18155bccfc08485468c112409ea5064595699e98c545f245f32d" val, err := util.Uint256DecodeStringLE(hexStr)