diff --git a/database/memdb/db.go b/database/memdb/db.go index 87a92ab6ee93..e6da20773ac3 100644 --- a/database/memdb/db.go +++ b/database/memdb/db.go @@ -5,6 +5,7 @@ package memdb import ( "context" + "fmt" "slices" "strings" "sync" @@ -38,6 +39,21 @@ func New() *Database { return NewWithSize(DefaultSize) } +// Copy returns a Database with the same key-value pairs as db +func Copy(db *Database) (*Database, error) { + db.lock.Lock() + defer db.lock.Unlock() + + result := New() + for k, v := range db.db { + if err := result.Put([]byte(k), v); err != nil { + return nil, fmt.Errorf("failed to insert key: %w", err) + } + } + + return result, nil +} + // NewWithSize returns a map pre-allocated to the provided size with the // Database interface methods implemented. func NewWithSize(size int) *Database { diff --git a/vms/avm/block/builder/builder.go b/vms/avm/block/builder/builder.go index 10d25b569a0d..bca0300d5ab4 100644 --- a/vms/avm/block/builder/builder.go +++ b/vms/avm/block/builder/builder.go @@ -174,5 +174,5 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) { return nil, err } - return b.manager.NewBlock(statelessBlk), nil + return b.manager.NewBlock(statelessBlk, b.backend.Ctx.SharedMemory), nil } diff --git a/vms/avm/block/builder/builder_test.go b/vms/avm/block/builder/builder_test.go index 3590ebe5f56d..dfde365529f6 100644 --- a/vms/avm/block/builder/builder_test.go +++ b/vms/avm/block/builder/builder_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "github.com/ava-labs/avalanchego/chains/atomic" "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/codec/codecmock" "github.com/ava-labs/avalanchego/database/memdb" @@ -291,8 +292,11 @@ func TestBuilderBuildBlock(t *testing.T) { manager.EXPECT().VerifyUniqueInputs(preferredID, gomock.Any()).Return(nil) // Assert created block has one tx, tx1, // and other fields are set correctly. - manager.EXPECT().NewBlock(gomock.Any()).DoAndReturn( - func(block *block.StandardBlock) snowman.Block { + manager.EXPECT().NewBlock(gomock.Any(), gomock.Any()).DoAndReturn( + func( + block *block.StandardBlock, + _ atomic.SharedMemory, + ) snowman.Block { require.Len(t, block.Transactions, 1) require.Equal(t, tx1, block.Transactions[0]) require.Equal(t, preferredHeight+1, block.Height()) @@ -350,8 +354,11 @@ func TestBuilderBuildBlock(t *testing.T) { manager.EXPECT().GetState(preferredID).Return(preferredState, true) manager.EXPECT().VerifyUniqueInputs(preferredID, gomock.Any()).Return(nil) // Assert that the created block has the right timestamp - manager.EXPECT().NewBlock(gomock.Any()).DoAndReturn( - func(block *block.StandardBlock) snowman.Block { + manager.EXPECT().NewBlock(gomock.Any(), gomock.Any()).DoAndReturn( + func( + block *block.StandardBlock, + _ atomic.SharedMemory, + ) snowman.Block { require.Equal(t, preferredTimestamp.Unix(), block.Timestamp().Unix()) return nil }, @@ -422,8 +429,11 @@ func TestBuilderBuildBlock(t *testing.T) { manager.EXPECT().GetState(preferredID).Return(preferredState, true) manager.EXPECT().VerifyUniqueInputs(preferredID, gomock.Any()).Return(nil) // Assert that the created block has the right timestamp - manager.EXPECT().NewBlock(gomock.Any()).DoAndReturn( - func(block *block.StandardBlock) snowman.Block { + manager.EXPECT().NewBlock(gomock.Any(), gomock.Any()).DoAndReturn( + func( + block *block.StandardBlock, + _ atomic.SharedMemory, + ) snowman.Block { require.Equal(t, now.Unix(), block.Timestamp().Unix()) return nil }, @@ -526,7 +536,7 @@ func TestBlockBuilderAddLocalTx(t *testing.T) { parentBlk, err := block.NewStandardBlock(parentID, 0, parentTimestamp, txs, cm) require.NoError(err) state.AddBlock(parentBlk) - state.SetLastAccepted(parentBlk.ID()) + state.SetLastAccepted(parentBlk.ID(), parentBlk.Height()) metrics, err := metrics.New(registerer) require.NoError(err) diff --git a/vms/avm/block/executor/block.go b/vms/avm/block/executor/block.go index 3692a5486a81..26130555d8c5 100644 --- a/vms/avm/block/executor/block.go +++ b/vms/avm/block/executor/block.go @@ -36,7 +36,8 @@ var ( // Exported for testing in avm package. type Block struct { block.Block - manager *manager + manager *manager + sharedMemory atomic.SharedMemory } func (b *Block) Verify(context.Context) error { @@ -201,7 +202,7 @@ func (b *Block) Verify(context.Context) error { // Now that the block has been executed, we can add the block data to the // state diff. - stateDiff.SetLastAccepted(blkID) + stateDiff.SetLastAccepted(blkID, b.Height()) stateDiff.AddBlock(b.Block) b.manager.blkIDToState[blkID] = blockState @@ -209,7 +210,7 @@ func (b *Block) Verify(context.Context) error { return nil } -func (b *Block) Accept(context.Context) error { +func (b *Block) Accept(ctx context.Context) error { blkID := b.ID() defer b.manager.free(blkID) @@ -240,7 +241,7 @@ func (b *Block) Accept(context.Context) error { } // Note that this method writes [batch] to the database. - if err := b.manager.backend.Ctx.SharedMemory.Apply(blkState.atomicRequests, batch); err != nil { + if err := b.sharedMemory.Apply(blkState.atomicRequests, batch); err != nil { return fmt.Errorf("failed to apply state diff to shared memory: %w", err) } @@ -248,12 +249,17 @@ func (b *Block) Accept(context.Context) error { return err } + checksum, err := b.manager.state.Checksum(ctx) + if err != nil { + return fmt.Errorf("failed to get checksum: %w", err) + } + b.manager.backend.Ctx.Log.Trace( "accepted block", zap.Stringer("blkID", blkID), zap.Uint64("height", b.Height()), zap.Stringer("parentID", b.Parent()), - zap.Stringer("checksum", b.manager.state.Checksum()), + zap.Stringer("checksum", checksum), ) return nil } diff --git a/vms/avm/block/executor/block_test.go b/vms/avm/block/executor/block_test.go index e757054eb025..908cf1a426f5 100644 --- a/vms/avm/block/executor/block_test.go +++ b/vms/avm/block/executor/block_test.go @@ -683,6 +683,7 @@ func TestBlockAccept(t *testing.T) { }, }, }, + sharedMemory: mockSharedMemory, } }, expectedErr: errTest, @@ -726,6 +727,7 @@ func TestBlockAccept(t *testing.T) { }, }, }, + sharedMemory: mockSharedMemory, } }, expectedErr: errTest, @@ -748,7 +750,7 @@ func TestBlockAccept(t *testing.T) { // because we mock the call to shared memory mockManagerState.EXPECT().CommitBatch().Return(nil, nil) mockManagerState.EXPECT().Abort() - mockManagerState.EXPECT().Checksum().Return(ids.Empty) + mockManagerState.EXPECT().Checksum(gomock.Any()).Return(ids.Empty, nil) mockSharedMemory := atomicmock.NewSharedMemory(ctrl) mockSharedMemory.EXPECT().Apply(gomock.Any(), gomock.Any()).Return(nil) @@ -772,6 +774,7 @@ func TestBlockAccept(t *testing.T) { }, }, }, + sharedMemory: mockSharedMemory, } }, expectedErr: nil, diff --git a/vms/avm/block/executor/executormock/manager.go b/vms/avm/block/executor/executormock/manager.go index 0579a81f9320..0532b574814e 100644 --- a/vms/avm/block/executor/executormock/manager.go +++ b/vms/avm/block/executor/executormock/manager.go @@ -12,6 +12,7 @@ package executormock import ( reflect "reflect" + atomic "github.com/ava-labs/avalanchego/chains/atomic" ids "github.com/ava-labs/avalanchego/ids" snowman "github.com/ava-labs/avalanchego/snow/consensus/snowman" set "github.com/ava-labs/avalanchego/utils/set" @@ -105,17 +106,17 @@ func (mr *ManagerMockRecorder) LastAccepted() *gomock.Call { } // NewBlock mocks base method. -func (m *Manager) NewBlock(arg0 block.Block) snowman.Block { +func (m *Manager) NewBlock(blk block.Block, sm atomic.SharedMemory) snowman.Block { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewBlock", arg0) + ret := m.ctrl.Call(m, "NewBlock", blk, sm) ret0, _ := ret[0].(snowman.Block) return ret0 } // NewBlock indicates an expected call of NewBlock. -func (mr *ManagerMockRecorder) NewBlock(arg0 any) *gomock.Call { +func (mr *ManagerMockRecorder) NewBlock(blk, sm any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewBlock", reflect.TypeOf((*Manager)(nil).NewBlock), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewBlock", reflect.TypeOf((*Manager)(nil).NewBlock), blk, sm) } // Preferred mocks base method. @@ -132,6 +133,18 @@ func (mr *ManagerMockRecorder) Preferred() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Preferred", reflect.TypeOf((*Manager)(nil).Preferred)) } +// SetLastAccepted mocks base method. +func (m *Manager) SetLastAccepted(id ids.ID) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetLastAccepted", id) +} + +// SetLastAccepted indicates an expected call of SetLastAccepted. +func (mr *ManagerMockRecorder) SetLastAccepted(id any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*Manager)(nil).SetLastAccepted), id) +} + // SetPreference mocks base method. func (m *Manager) SetPreference(blkID ids.ID) { m.ctrl.T.Helper() diff --git a/vms/avm/block/executor/manager.go b/vms/avm/block/executor/manager.go index 73a3e366d8dd..2c093cb78158 100644 --- a/vms/avm/block/executor/manager.go +++ b/vms/avm/block/executor/manager.go @@ -31,13 +31,14 @@ type Manager interface { // Returns the ID of the most recently accepted block. LastAccepted() ids.ID + SetLastAccepted(id ids.ID) SetPreference(blkID ids.ID) Preferred() ids.ID GetBlock(blkID ids.ID) (snowman.Block, error) GetStatelessBlock(blkID ids.ID) (block.Block, error) - NewBlock(block.Block) snowman.Block + NewBlock(blk block.Block, sm atomic.SharedMemory) snowman.Block // VerifyTx verifies that the transaction can be issued based on the currently // preferred state. This should *not* be used to verify transactions in a block. @@ -109,6 +110,10 @@ func (m *manager) LastAccepted() ids.ID { return m.lastAccepted } +func (m *manager) SetLastAccepted(blkID ids.ID) { + m.lastAccepted = blkID +} + func (m *manager) SetPreference(blockID ids.ID) { m.preferred = blockID } @@ -122,7 +127,7 @@ func (m *manager) GetBlock(blkID ids.ID) (snowman.Block, error) { if err != nil { return nil, err } - return m.NewBlock(blk), nil + return m.NewBlock(blk, m.backend.Ctx.SharedMemory), nil } func (m *manager) GetStatelessBlock(blkID ids.ID) (block.Block, error) { @@ -134,10 +139,14 @@ func (m *manager) GetStatelessBlock(blkID ids.ID) (block.Block, error) { return m.state.GetBlock(blkID) } -func (m *manager) NewBlock(blk block.Block) snowman.Block { +func (m *manager) NewBlock( + blk block.Block, + sm atomic.SharedMemory, +) snowman.Block { return &Block{ - Block: blk, - manager: m, + Block: blk, + manager: m, + sharedMemory: sm, } } diff --git a/vms/avm/environment_test.go b/vms/avm/environment_test.go index 39e4db0eb764..f311ffd25de4 100644 --- a/vms/avm/environment_test.go +++ b/vms/avm/environment_test.go @@ -25,6 +25,7 @@ import ( "github.com/ava-labs/avalanchego/vms/avm/block/executor" "github.com/ava-labs/avalanchego/vms/avm/config" "github.com/ava-labs/avalanchego/vms/avm/fxs" + "github.com/ava-labs/avalanchego/vms/avm/state" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/avm/txs/txstest" "github.com/ava-labs/avalanchego/vms/components/avax" @@ -113,7 +114,8 @@ func setup(tb testing.TB, c *envConfig) *environment { } vm := &VM{ - Config: vmStaticConfig, + Config: vmStaticConfig, + StateMigration: state.NoMigration{}, } vmDynamicConfig := DefaultConfig @@ -408,12 +410,12 @@ func issueAndAccept( require *require.Assertions, vm *VM, tx *txs.Tx, -) { +) ids.ID { txID, err := vm.issueTxFromRPC(tx) require.NoError(err) require.Equal(tx.ID(), txID) - buildAndAccept(require, vm, txID) + return buildAndAccept(require, vm, txID) } // buildAndAccept expects the context lock not to be held @@ -421,7 +423,7 @@ func buildAndAccept( require *require.Assertions, vm *VM, txID ids.ID, -) { +) ids.ID { msg, err := vm.WaitForEvent(context.Background()) require.NoError(err) require.Equal(common.PendingTxs, msg) @@ -442,4 +444,6 @@ func buildAndAccept( require.NoError(blk.Verify(context.Background())) require.NoError(vm.SetPreference(context.Background(), blk.ID())) require.NoError(blk.Accept(context.Background())) + + return blk.ID() } diff --git a/vms/avm/factory.go b/vms/avm/factory.go index af2d33389c13..7731bc676667 100644 --- a/vms/avm/factory.go +++ b/vms/avm/factory.go @@ -7,6 +7,7 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms" "github.com/ava-labs/avalanchego/vms/avm/config" + "github.com/ava-labs/avalanchego/vms/avm/state" ) var _ vms.Factory = (*Factory)(nil) @@ -16,5 +17,9 @@ type Factory struct { } func (f *Factory) New(logging.Logger) (interface{}, error) { - return &VM{Config: f.Config}, nil + return &VM{ + Config: f.Config, + StateMigration: &state.FirewoodMigration{CommitFrequency: 1_000}, + }, + nil } diff --git a/vms/avm/state/diff.go b/vms/avm/state/diff.go index f7bbdb63bf3f..b33e09f438e5 100644 --- a/vms/avm/state/diff.go +++ b/vms/avm/state/diff.go @@ -38,8 +38,9 @@ type diff struct { addedBlockIDs map[uint64]ids.ID // map of height -> blockID addedBlocks map[ids.ID]block.Block // map of blockID -> block - lastAccepted ids.ID - timestamp time.Time + lastAccepted ids.ID + lastAcceptedHeight uint64 + timestamp time.Time } func NewDiff( @@ -149,8 +150,9 @@ func (d *diff) GetLastAccepted() ids.ID { return d.lastAccepted } -func (d *diff) SetLastAccepted(lastAccepted ids.ID) { +func (d *diff) SetLastAccepted(lastAccepted ids.ID, height uint64) { d.lastAccepted = lastAccepted + d.lastAcceptedHeight = height } func (d *diff) GetTimestamp() time.Time { @@ -178,6 +180,6 @@ func (d *diff) Apply(state Chain) { state.AddBlock(blk) } - state.SetLastAccepted(d.lastAccepted) + state.SetLastAccepted(d.lastAccepted, d.lastAcceptedHeight) state.SetTimestamp(d.timestamp) } diff --git a/vms/avm/state/firewood.go b/vms/avm/state/firewood.go new file mode 100644 index 000000000000..407f7e2b084f --- /dev/null +++ b/vms/avm/state/firewood.go @@ -0,0 +1,131 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package state + +import ( + "context" + "errors" + "fmt" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/versiondb" + "github.com/ava-labs/avalanchego/firewood" + "github.com/ava-labs/avalanchego/ids" +) + +var atomicTxPrefix = []byte("atomic_tx") + +type firewoodDB struct { + db *firewood.DB + versionDB *versiondb.Database +} + +var _ ChainDB = (*firewoodDB)(nil) + +func (f *firewoodDB) AddAtomicTx(txID ids.ID) { + f.db.Put(firewood.Prefix(atomicTxPrefix, txID[:]), []byte{}) +} + +func (f *firewoodDB) Repair(ctx context.Context, vm VM, s State) error { + lastAcceptedBlk, err := s.GetBlock(s.GetLastAccepted()) + if err != nil { + return fmt.Errorf("getting last accepted block: %w", err) + } + + var replayStartHeight int + if firewoodHeight, ok := f.db.Height(); !ok { + // A height of zero means that we have not flushed any data to firewood yet, + // so we need to replay all blocks including genesis. + replayStartHeight = -1 + } else { + replayStartHeight = int(firewoodHeight) + } + + // Replay any blocks until the last accepted height to synchronize the chain + // and local dbs. + for i := replayStartHeight; i < int(lastAcceptedBlk.Height()); i++ { + blkID, err := s.GetBlockIDAtHeight(uint64(i + 1)) + if err != nil { + return fmt.Errorf("getting block id: %w", err) + } + + blk, err := s.GetBlock(blkID) + if err != nil { + return fmt.Errorf("getting block: %w", err) + } + + if err := vm.Replay(ctx, blk); err != nil { + return fmt.Errorf("replaying block: %w", err) + } + } + + return nil +} + +func (f *firewoodDB) Abort() { + f.db.Abort() +} + +func (f *firewoodDB) CommitBatch(height uint64) ( + database.Batch, + error, +) { + b, err := f.versionDB.CommitBatch() + if err != nil { + return nil, err + } + + return &firewoodBatch{ + Batch: b, + nextStateHeight: height, + firewoodDB: f.db, + }, nil +} + +func (f *firewoodDB) Close(ctx context.Context) error { + return f.db.Close(ctx) +} + +// firewoodBatch wraps the underlying batch to also write to firewood after +// it is written to. +type firewoodBatch struct { + database.Batch + nextStateHeight uint64 + firewoodDB *firewood.DB +} + +func (f *firewoodBatch) Write() error { + chainDBHeight, ok := f.firewoodDB.Height() + if !ok && f.nextStateHeight > 1 { + // This should always be initialized after the first (non-genesis) block is + // accepted. + return errors.New("chain db was not initialized") + } + + if nextFirewoodHeight := chainDBHeight + 1; ok && nextFirewoodHeight != f.nextStateHeight { + // Avoid writing state if the next revision height would be out-of-sync + // because firewood should never be out-of-sync with versiondb after we have + // finished repairing. We should only create one firewood revision per + // block height. + // + // We cannot perform this check if the height is not initialized because + // there is no height in firewood before the genesis block is written. + return fmt.Errorf( + "%w: next height is at %d but next revision would be %d", + errDBsOutOfSync, + f.nextStateHeight, + nextFirewoodHeight, + ) + } + + if err := f.Batch.Write(); err != nil { + return fmt.Errorf("writing versiondb batch: %w", err) + } + + if err := f.firewoodDB.Flush(); err != nil { + return fmt.Errorf("flushing to firewood: %w", err) + } + + return nil +} diff --git a/vms/avm/state/granite.go b/vms/avm/state/granite.go new file mode 100644 index 000000000000..312755f00431 --- /dev/null +++ b/vms/avm/state/granite.go @@ -0,0 +1,43 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package state + +import ( + "context" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/versiondb" + "github.com/ava-labs/avalanchego/ids" +) + +// Deprecated: after the firewood db migration this type should no longer be +// used. +// +// NoChainDB is used before the Firewood migration when chain state was kept in +// the same database as the rest of the state. +type NoChainDB struct { + VersionDB *versiondb.Database +} + +var _ ChainDB = (*NoChainDB)(nil) + +// AddAtomicTx is a no-op because atomic txs are not a part of chain state +func (*NoChainDB) AddAtomicTx(ids.ID) {} + +// Repair is a no-op because the db is always written atomically with the +// rest of the State. +func (*NoChainDB) Repair(context.Context, VM, State) error { + return nil +} + +// Abort is a no-op because there is no chain-specific db. +func (*NoChainDB) Abort() {} + +func (db *NoChainDB) CommitBatch(uint64) (database.Batch, error) { + return db.VersionDB.CommitBatch() +} + +func (*NoChainDB) Close(context.Context) error { + return nil +} diff --git a/vms/avm/state/migration.go b/vms/avm/state/migration.go new file mode 100644 index 000000000000..dadcdb165ad1 --- /dev/null +++ b/vms/avm/state/migration.go @@ -0,0 +1,445 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package state + +import ( + "errors" + "fmt" + "path/filepath" + "time" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/pebbledb" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/database/versiondb" + "github.com/ava-labs/avalanchego/firewood" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/avm/block" + "github.com/ava-labs/avalanchego/vms/components/avax" +) + +var ( + migrationStatusKey = []byte("status") + lastMigratedUTXOKey = []byte("utxo") + lastMigratedTxKey = []byte("tx") + lastMigratedBlockKey = []byte("block") +) + +type Migration interface { + Migrate( + log logging.Logger, + parser block.Parser, + metrics prometheus.Registerer, + prev State, + prevVersionDB *versiondb.Database, + prevUTXODB database.Database, + prevTXDB database.Database, + prevBlockIDDB database.Database, + prevBlockDB database.Database, + prevSingletonDB database.Database, + chainDataDir string, + stopVertexID ids.ID, + genesisTimestamp time.Time, + ) (State, ChainDB, error) +} + +type NoMigration struct{} + +var _ Migration = (*NoMigration)(nil) + +func (NoMigration) Migrate( + _ logging.Logger, + _ block.Parser, + _ prometheus.Registerer, + prev State, + vdb *versiondb.Database, + _ database.Database, + _ database.Database, + _ database.Database, + _ database.Database, + _ database.Database, + _ string, + _ ids.ID, + _ time.Time, +) (State, ChainDB, error) { + return prev, &NoChainDB{VersionDB: vdb}, nil +} + +type FirewoodMigration struct { + CommitFrequency int + + updates int +} + +var _ Migration = (*FirewoodMigration)(nil) + +func (f *FirewoodMigration) Migrate( + log logging.Logger, + parser block.Parser, + metrics prometheus.Registerer, + _ State, + prevVersionDB *versiondb.Database, + prevUTXODB database.Database, + prevTxDB database.Database, + prevBlockIDDB database.Database, + prevBlockDB database.Database, + prevSingletonDB database.Database, + chainDataDir string, + stopVertexID ids.ID, + genesisTimestamp time.Time, +) (State, ChainDB, error) { + next, chainDB, err := newMigrationState(parser, metrics, chainDataDir) + if err != nil { + return nil, nil, fmt.Errorf("initializing state: %w", err) + } + + done, err := next.GetMigrationStatus() + if errors.Is(err, database.ErrNotFound) { + if err := next.PutMigrationStatus(false); err != nil { + return nil, nil, fmt.Errorf("putting migration status: %w", err) + } + } else if err != nil { + return nil, nil, fmt.Errorf("getting state migration status: %w", err) + } + + if done { + log.Debug("skipping state migration") + return next, chainDB, nil + } + + log.Info("starting state migration") + lastMigratedUTXO, err := next.GetLastMigratedUTXO() + if err != nil { + return nil, nil, fmt.Errorf("getting last migrated utxo: %w", err) + } + + log.Debug("deleting utxos") + itr := prevUTXODB.NewIteratorWithStart(lastMigratedUTXO[:]) + for itr.Next() { + utxoID, err := ids.ToID(itr.Key()) + if err != nil { + return nil, nil, fmt.Errorf("parsing utxo id: %w", err) + } + + if err := next.PutLastMigratedUTXO(utxoID); err != nil { + return nil, nil, fmt.Errorf("putting last migrated utxo: %w", err) + } + + if err := prevUTXODB.Delete(itr.Key()); err != nil { + return nil, nil, fmt.Errorf("deleting utxo: %w", err) + } + + ok, err := f.commit(prevVersionDB, next.versionDB) + if err != nil { + return nil, nil, fmt.Errorf("committing db: %w", err) + } + if !ok { + continue + } + + log.Verbo("committed migration progress", + zap.Stringer("utxoID", utxoID), + ) + } + + if err := itr.Error(); err != nil { + return nil, nil, fmt.Errorf("iterating db: %w", err) + } + + log.Debug("deleting txs") + lastMigratedTx, err := next.GetLastMigratedTx() + if err != nil { + return nil, nil, fmt.Errorf("getting last migrated tx: %w", err) + } + + itr = prevTxDB.NewIteratorWithStart(lastMigratedTx[:]) + for itr.Next() { + txID, err := ids.ToID(itr.Key()) + if err != nil { + return nil, nil, fmt.Errorf("parsing tx id: %w", err) + } + + if err := next.PutLastMigratedTx(txID); err != nil { + return nil, nil, fmt.Errorf("putting last migrated tx: %w", err) + } + + if err := prevTxDB.Delete(itr.Key()); err != nil { + return nil, nil, fmt.Errorf("deleting migrated tx: %w", err) + } + + ok, err := f.commit(prevVersionDB, next.versionDB) + if err != nil { + return nil, nil, fmt.Errorf("committing db: %w", err) + } + if !ok { + continue + } + + log.Verbo("committed migration progress", zap.Stringer("txID", txID)) + } + + if err := itr.Error(); err != nil { + return nil, nil, fmt.Errorf("iterating db: %w", err) + } + + log.Debug("migrating blocks") + lastMigratedBlk, err := next.GetLastMigratedBlock() + if err != nil { + return nil, nil, fmt.Errorf("getting last migrated block: %w", err) + } + + itr = prevBlockDB.NewIteratorWithStart(lastMigratedBlk[:]) + for itr.Next() { + blkID, err := ids.ToID(itr.Key()) + if err != nil { + return nil, nil, fmt.Errorf("parsing block id: %w", err) + } + + blk, err := parser.ParseBlock(itr.Value()) + if err != nil { + return nil, nil, fmt.Errorf("parsing block: %w", err) + } + + next.AddBlock(blk) + + if err := next.PutLastMigratedBlock(blkID); err != nil { + return nil, nil, fmt.Errorf("putting last migrated block: %w", err) + } + + if err := prevBlockDB.Delete(itr.Key()); err != nil { + return nil, nil, fmt.Errorf("deleting migrated block: %w", err) + } + + if err := prevBlockIDDB.Delete(database.PackUInt64(blk.Height())); err != nil { + return nil, nil, fmt.Errorf("deleting migrated block: %w", err) + } + + ok, err := f.commit(prevVersionDB, next.versionDB) + if err != nil { + return nil, nil, fmt.Errorf("committing db: %w", err) + } + if !ok { + continue + } + + log.Verbo("committed migration progress", zap.Stringer("blkID", blkID)) + } + + if err := itr.Error(); err != nil { + return nil, nil, fmt.Errorf("iterating db: %w", err) + } + + log.Debug("migrating singletons") + itr = prevSingletonDB.NewIterator() + for itr.Next() { + if err := next.singletonDB.Put(itr.Key(), itr.Value()); err != nil { + return nil, nil, fmt.Errorf("migrating singleton: %w", err) + } + + if err := prevSingletonDB.Delete(itr.Key()); err != nil { + return nil, nil, fmt.Errorf("deleting singleton: %w", err) + } + } + + if err := itr.Error(); err != nil { + return nil, nil, fmt.Errorf("iterating db: %w", err) + } + + if err := next.PutMigrationStatus(true); err != nil { + return nil, nil, fmt.Errorf("putting migration status: %w", err) + } + + if err := next.versionDB.Commit(); err != nil { + return nil, nil, fmt.Errorf("committing state: %w", err) + } + + if err := prevVersionDB.Commit(); err != nil { + return nil, nil, fmt.Errorf("committing state: %w", err) + } + + // It is safe to call InitializeChainState because it was called before the + // migration when creating the previous state - so it is not possible for us + // to create two revisions at genesis. + if err := next.InitializeChainState(stopVertexID, genesisTimestamp); err != nil { + return nil, nil, fmt.Errorf("failed to initialize chain state: %w", err) + } + + log.Info("migration complete") + + return next, chainDB, nil +} + +func (f *FirewoodMigration) commit( + prev *versiondb.Database, + next *versiondb.Database, +) ( + bool, + error, +) { + f.updates++ + if f.updates%f.CommitFrequency != 0 { + return false, nil + } + + if err := next.Commit(); err != nil { + return false, err + } + + if err := prev.Commit(); err != nil { + return false, err + } + + return true, nil +} + +type migrationState struct { + State + versionDB *versiondb.Database + singletonDB database.Database + migrationDB database.Database +} + +func newMigrationState( + parser block.Parser, + metrics prometheus.Registerer, + chainDataDir string, +) (*migrationState, ChainDB, error) { + // The state db is used for state agreed upon by consensus + stateDB, err := firewood.New(filepath.Join(chainDataDir, "state")) + if err != nil { + return nil, nil, fmt.Errorf("initializing state db: %w", err) + } + + // The local db is used for all other data not agreed upon by consensus + localDB, err := pebbledb.New( + filepath.Join(chainDataDir, "local"), + nil, + logging.NoLog{}, + metrics, + ) + if err != nil { + return nil, nil, fmt.Errorf("initializing local db: %w", err) + } + + versionDB := versiondb.New(localDB) + + // Reserve a prefix for future re-indexing + v1PrefixDB := prefixdb.New([]byte("v1"), versionDB) + utxoIndexDB := prefixdb.New([]byte("utxo_index"), v1PrefixDB) + txDB := prefixdb.New([]byte("tx"), v1PrefixDB) + blockIDDB := prefixdb.New([]byte("block_id"), v1PrefixDB) + blockDB := prefixdb.New([]byte("block"), v1PrefixDB) + singletonDB := prefixdb.New([]byte("singleton"), v1PrefixDB) + migrationDB := prefixdb.New([]byte("migration"), versionDB) + chainDB := &firewoodDB{ + db: stateDB, + versionDB: versionDB, + } + + s, err := NewWithFormat( + "foo_state", + chainDB, + localDB, + versionDB, + utxoIndexDB, + txDB, + blockIDDB, + blockDB, + singletonDB, + avax.NewFirewoodUTXODB(stateDB, parser.Codec()), + parser, + metrics, + ) + if err != nil { + return nil, nil, fmt.Errorf("initializing state: %w", err) + } + + return &migrationState{ + State: s, + versionDB: versionDB, + singletonDB: singletonDB, + migrationDB: migrationDB, + }, + chainDB, + nil +} + +func (m *migrationState) GetMigrationStatus() (bool, error) { + return database.GetBool(m.migrationDB, migrationStatusKey) +} + +func (m *migrationState) PutMigrationStatus(done bool) error { + return database.PutBool(m.migrationDB, migrationStatusKey, done) +} + +func (m *migrationState) GetLastMigratedUTXO() (ids.ID, error) { + utxoIDBytes, err := m.migrationDB.Get(lastMigratedUTXOKey) + if errors.Is(err, database.ErrNotFound) { + return ids.ID{}, nil + } + if err != nil { + return ids.ID{}, err + } + + utxoID, err := ids.ToID(utxoIDBytes) + if err != nil { + return ids.ID{}, err + } + + return utxoID, nil +} + +func (m *migrationState) PutLastMigratedUTXO(utxoID ids.ID) error { + return m.migrationDB.Put(lastMigratedUTXOKey, utxoID[:]) +} + +func (m *migrationState) GetLastMigratedTx() (ids.ID, error) { + txIDBytes, err := m.migrationDB.Get(lastMigratedTxKey) + if errors.Is(err, database.ErrNotFound) { + return ids.ID{}, nil + } + if err != nil { + return ids.ID{}, err + } + + txID, err := ids.ToID(txIDBytes) + if err != nil { + return ids.ID{}, err + } + + return txID, nil +} + +func (m *migrationState) PutLastMigratedTx(txID ids.ID) error { + if err := m.migrationDB.Put(lastMigratedTxKey, txID[:]); err != nil { + return err + } + + return nil +} + +func (m *migrationState) GetLastMigratedBlock() (ids.ID, error) { + blkIDBytes, err := m.migrationDB.Get(lastMigratedBlockKey) + if errors.Is(err, database.ErrNotFound) { + return ids.ID{}, nil + } + if err != nil { + return ids.ID{}, err + } + + blkID, err := ids.ToID(blkIDBytes) + if err != nil { + return ids.ID{}, err + } + + return blkID, nil +} + +func (m *migrationState) PutLastMigratedBlock(blkID ids.ID) error { + return m.migrationDB.Put(lastMigratedBlockKey, blkID[:]) +} diff --git a/vms/avm/state/state.go b/vms/avm/state/state.go index 3370bb3baa51..ef24b472e40c 100644 --- a/vms/avm/state/state.go +++ b/vms/avm/state/state.go @@ -4,6 +4,7 @@ package state import ( + "context" "errors" "fmt" "time" @@ -17,6 +18,7 @@ import ( "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/vms/avm/block" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/components/avax" @@ -29,11 +31,14 @@ const ( ) var ( - utxoPrefix = []byte("utxo") - txPrefix = []byte("tx") - blockIDPrefix = []byte("blockID") - blockPrefix = []byte("block") - singletonPrefix = []byte("singleton") + errDBsOutOfSync = errors.New("dbs are out of sync") + + UTXOPrefix = []byte("utxo") + IndexPrefix = []byte("index") + TxPrefix = []byte("tx") + BlockIDPrefix = []byte("blockID") + BlockPrefix = []byte("block") + SingletonPrefix = []byte("singleton") isInitializedKey = []byte{0x00} timestampKey = []byte{0x01} @@ -42,6 +47,29 @@ var ( _ State = (*state)(nil) ) +// VM defines the execution layer that is able to perform the state +// transition defined in a block. +type VM interface { + // Replay applies `b` during [ChainDB.Repair] to get [ChainDB] to have a + // consistent view of `b` as [State]. + Replay(ctx context.Context, b block.Block) error +} + +// ChainDB holds data for the canonical state of the chain and should +// not be used for data derived from state. +type ChainDB interface { + // AddAtomicTx adds an atomic tx to this db. + AddAtomicTx(txID ids.ID) + // Repair repairs ChainDB when it has view inconsistent with State. + Repair(ctx context.Context, vm VM, s State) error + // Abort cancels any pending changes to this db. + Abort() + // CommitBatch returns a batch with any pending changes. + CommitBatch(height uint64) (database.Batch, error) + // Close closes this db and prevents future operations on it. + Close(ctx context.Context) error +} + type ReadOnlyChain interface { avax.UTXOGetter @@ -59,7 +87,7 @@ type Chain interface { AddTx(tx *txs.Tx) AddBlock(block block.Block) - SetLastAccepted(blkID ids.ID) + SetLastAccepted(blkID ids.ID, height uint64) SetTimestamp(t time.Time) } @@ -91,9 +119,9 @@ type State interface { CommitBatch() (database.Batch, error) // Checksum returns the current state checksum. - Checksum() ids.ID + Checksum(ctx context.Context) (ids.ID, error) - Close() error + Close(ctx context.Context) error } /* @@ -112,11 +140,12 @@ type State interface { * '-- lastAcceptedKey -> lastAccepted */ type state struct { - parser block.Parser - db *versiondb.Database + parser block.Parser + chainDB ChainDB + baseLocalDB database.Database + vdb *versiondb.Database modifiedUTXOs map[ids.ID]*avax.UTXO // map of modified UTXOID -> *UTXO if the UTXO is nil, it has been removed - utxoDB database.Database utxoState avax.UTXOState addedTxs map[ids.ID]*txs.Tx // map of txID -> *txs.Tx @@ -133,24 +162,57 @@ type state struct { // [lastAccepted] is the most recently accepted block. lastAccepted, persistedLastAccepted ids.ID + lastAcceptedHeight uint64 timestamp, persistedTimestamp time.Time singletonDB database.Database } +// Deprecated: [NewWithFormat] should be used instead func New( - db *versiondb.Database, + baseDB database.Database, parser block.Parser, metrics prometheus.Registerer, trackChecksums bool, -) (State, error) { - utxoDB := prefixdb.New(utxoPrefix, db) - txDB := prefixdb.New(txPrefix, db) - blockIDDB := prefixdb.New(blockIDPrefix, db) - blockDB := prefixdb.New(blockPrefix, db) - singletonDB := prefixdb.New(singletonPrefix, db) +) (*state, error) { + vdb := versiondb.New(baseDB) + + return NewWithFormat( + "state", + &NoChainDB{VersionDB: vdb}, + baseDB, + vdb, + prefixdb.New(IndexPrefix, vdb), + prefixdb.New(TxPrefix, vdb), + prefixdb.New(BlockIDPrefix, vdb), + prefixdb.New(BlockPrefix, vdb), + prefixdb.New(SingletonPrefix, vdb), + avax.NewUTXODatabase( + prefixdb.New(UTXOPrefix, vdb), + parser.Codec(), + trackChecksums, + ), + parser, + metrics, + ) +} +// NewWithFormat returns a [State] with a defined db format. +func NewWithFormat( + namespace string, + chainDB ChainDB, + localBaseDB database.Database, + vdb *versiondb.Database, + utxoIndexDB database.Database, + txDB database.Database, + blockIDDB database.Database, + blockDB database.Database, + singletonDB database.Database, + utxoDB avax.UTXODB, + parser block.Parser, + metrics prometheus.Registerer, +) (*state, error) { txCache, err := metercacher.New[ids.ID, *txs.Tx]( - "tx_cache", + metric.AppendNamespace(namespace, "tx_cache"), metrics, lru.NewCache[ids.ID, *txs.Tx](txCacheSize), ) @@ -159,7 +221,7 @@ func New( } blockIDCache, err := metercacher.New[uint64, ids.ID]( - "block_id_cache", + metric.AppendNamespace(namespace, "block_id_cache"), metrics, lru.NewCache[uint64, ids.ID](blockIDCacheSize), ) @@ -168,7 +230,7 @@ func New( } blockCache, err := metercacher.New[ids.ID, block.Block]( - "block_cache", + metric.AppendNamespace(namespace, "block_cache"), metrics, lru.NewCache[ids.ID, block.Block](blockCacheSize), ) @@ -176,17 +238,23 @@ func New( return nil, err } - utxoState, err := avax.NewMeteredUTXOState(utxoDB, parser.Codec(), metrics, trackChecksums) + utxoState, err := avax.NewMeteredUTXOState( + namespace, + utxoDB, + utxoIndexDB, + metrics, + ) if err != nil { return nil, err } return &state{ - parser: parser, - db: db, + parser: parser, + chainDB: chainDB, + baseLocalDB: localBaseDB, + vdb: vdb, modifiedUTXOs: make(map[ids.ID]*avax.UTXO), - utxoDB: utxoDB, utxoState: utxoState, addedTxs: make(map[ids.ID]*txs.Tx), @@ -358,7 +426,7 @@ func (s *state) initializeChainState(stopVertexID ids.ID, genesisTimestamp time. return fmt.Errorf("failed to initialize genesis block: %w", err) } - s.SetLastAccepted(genesis.ID()) + s.SetLastAccepted(genesis.ID(), genesis.Height()) s.SetTimestamp(genesis.Timestamp()) s.AddBlock(genesis) @@ -381,8 +449,9 @@ func (s *state) GetLastAccepted() ids.ID { return s.lastAccepted } -func (s *state) SetLastAccepted(lastAccepted ids.ID) { +func (s *state) SetLastAccepted(lastAccepted ids.ID, height uint64) { s.lastAccepted = lastAccepted + s.lastAcceptedHeight = height } func (s *state) GetTimestamp() time.Time { @@ -403,24 +472,28 @@ func (s *state) Commit() error { } func (s *state) Abort() { - s.db.Abort() + s.vdb.Abort() + s.chainDB.Abort() } func (s *state) CommitBatch() (database.Batch, error) { if err := s.write(); err != nil { return nil, err } - return s.db.CommitBatch() + + return s.chainDB.CommitBatch(s.lastAcceptedHeight) } -func (s *state) Close() error { +func (s *state) Close(ctx context.Context) error { return errors.Join( - s.utxoDB.Close(), + s.utxoState.Close(), s.txDB.Close(), s.blockIDDB.Close(), s.blockDB.Close(), s.singletonDB.Close(), - s.db.Close(), + s.vdb.Close(), + s.baseLocalDB.Close(), + s.chainDB.Close(ctx), ) } @@ -455,6 +528,14 @@ func (s *state) writeTxs() error { for txID, tx := range s.addedTxs { txBytes := tx.Bytes() + switch s.addedTxs[txID].Unsigned.(type) { + case *txs.ExportTx, *txs.ImportTx: + // Atomic txs are special-cased to be a part of chain state because + // their representation in atomic memory is not canonical. + s.chainDB.AddAtomicTx(txID) + default: + } + delete(s.addedTxs, txID) s.txCache.Put(txID, tx) if err := s.txDB.Put(txID[:], txBytes); err != nil { @@ -506,6 +587,6 @@ func (s *state) writeMetadata() error { return nil } -func (s *state) Checksum() ids.ID { +func (s *state) Checksum(context.Context) (ids.ID, error) { return s.utxoState.Checksum() } diff --git a/vms/avm/state/state_test.go b/vms/avm/state/state_test.go index 05fff1f4e402..6b20a105e6df 100644 --- a/vms/avm/state/state_test.go +++ b/vms/avm/state/state_test.go @@ -4,6 +4,7 @@ package state import ( + "path/filepath" "testing" "time" @@ -12,7 +13,9 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/database/versiondb" + "github.com/ava-labs/avalanchego/firewood" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/upgrade" "github.com/ava-labs/avalanchego/vms/avm/block" @@ -306,7 +309,7 @@ func TestInitializeChainState(t *testing.T) { require.NoError(err) s.AddBlock(childBlock) - s.SetLastAccepted(childBlock.ID()) + s.SetLastAccepted(childBlock.ID(), childBlock.Hght) require.NoError(s.Commit()) require.NoError(s.InitializeChainState(stopVertexID, genesisTimestamp)) @@ -316,3 +319,118 @@ func TestInitializeChainState(t *testing.T) { require.NoError(err) require.Equal(genesis.ID(), lastAccepted.Parent()) } + +// Tests that trying to call State.Commit will error if it causes firewood to +// have a height inconsistent with the rest of State. +func TestFirewoodInconsistentHeight(t *testing.T) { + db := memdb.New() + vdb := versiondb.New(db) + + firewood, err := firewood.New(filepath.Join(t.TempDir(), "state")) + require.NoError(t, err) + + s, err := NewWithFormat( + "foobar", + &firewoodDB{db: firewood, versionDB: vdb}, + db, + vdb, + prefixdb.New([]byte("utxo_index"), vdb), + prefixdb.New([]byte("tx"), vdb), + prefixdb.New([]byte("block_id"), vdb), + prefixdb.New([]byte("block_db"), vdb), + prefixdb.New([]byte("singleton"), vdb), + avax.NewFirewoodUTXODB(firewood, parser.Codec()), + parser, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + stopVertexID := ids.GenerateTestID() + genesisTimestamp := upgrade.InitiallyActiveTime + require.NoError(t, s.InitializeChainState(stopVertexID, genesisTimestamp)) + + err = s.Commit() + require.ErrorIs(t, err, errDBsOutOfSync) +} + +// TestFirewoodRootUpdate tests that the state root is updated. The state root +// is always expected to be updated because even when no state is updated, we +// include the block height as part of state. +func TestFirewoodRootUpdate(t *testing.T) { + tests := []struct { + name string + block block.Block + tx *txs.Tx + utxo *avax.UTXO + }{ + { + name: "block added", + block: &block.StandardBlock{}, + }, + { + name: "non-atomic tx added", + tx: &txs.Tx{Unsigned: &txs.BaseTx{}}, + }, + { + name: "atomic tx added", + tx: &txs.Tx{Unsigned: &txs.ExportTx{}}, + }, + { + name: "utxo added", + utxo: &avax.UTXO{Out: &secp256k1fx.TransferOutput{}}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + db := memdb.New() + vdb := versiondb.New(db) + + firewood, err := firewood.New(filepath.Join(t.TempDir(), "state")) + require.NoError(t, err) + + s, err := NewWithFormat( + "foobar", + &firewoodDB{db: firewood, versionDB: vdb}, + db, + vdb, + prefixdb.New([]byte("utxo_index"), vdb), + prefixdb.New([]byte("tx"), vdb), + prefixdb.New([]byte("block_id"), vdb), + prefixdb.New([]byte("block_db"), vdb), + prefixdb.New([]byte("singleton"), vdb), + avax.NewFirewoodUTXODB(firewood, parser.Codec()), + parser, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + + stopVertexID := ids.GenerateTestID() + genesisTimestamp := upgrade.InitiallyActiveTime + require.NoError(t, s.InitializeChainState(stopVertexID, genesisTimestamp)) + + if tt.block != nil { + s.AddBlock(tt.block) + } + + if tt.tx != nil { + s.AddTx(tt.tx) + } + + if tt.utxo != nil { + s.AddUTXO(tt.utxo) + } + + s.SetLastAccepted(ids.GenerateTestID(), 1) + + prev, err := s.Checksum(t.Context()) + require.NoError(t, err) + + require.NoError(t, s.Commit()) + + next, err := s.Checksum(t.Context()) + require.NoError(t, err) + require.NotEqual(t, prev, next) + }) + } +} diff --git a/vms/avm/state/statemock/chain.go b/vms/avm/state/statemock/chain.go index 58f4452fd122..d32389e33792 100644 --- a/vms/avm/state/statemock/chain.go +++ b/vms/avm/state/statemock/chain.go @@ -181,15 +181,15 @@ func (mr *ChainMockRecorder) GetUTXO(utxoID any) *gomock.Call { } // SetLastAccepted mocks base method. -func (m *Chain) SetLastAccepted(blkID ids.ID) { +func (m *Chain) SetLastAccepted(blkID ids.ID, height uint64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetLastAccepted", blkID) + m.ctrl.Call(m, "SetLastAccepted", blkID, height) } // SetLastAccepted indicates an expected call of SetLastAccepted. -func (mr *ChainMockRecorder) SetLastAccepted(blkID any) *gomock.Call { +func (mr *ChainMockRecorder) SetLastAccepted(blkID, height any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*Chain)(nil).SetLastAccepted), blkID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*Chain)(nil).SetLastAccepted), blkID, height) } // SetTimestamp mocks base method. diff --git a/vms/avm/state/statemock/diff.go b/vms/avm/state/statemock/diff.go index 35724dfdb457..fbefd75f5ee0 100644 --- a/vms/avm/state/statemock/diff.go +++ b/vms/avm/state/statemock/diff.go @@ -194,15 +194,15 @@ func (mr *DiffMockRecorder) GetUTXO(utxoID any) *gomock.Call { } // SetLastAccepted mocks base method. -func (m *Diff) SetLastAccepted(blkID ids.ID) { +func (m *Diff) SetLastAccepted(blkID ids.ID, height uint64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetLastAccepted", blkID) + m.ctrl.Call(m, "SetLastAccepted", blkID, height) } // SetLastAccepted indicates an expected call of SetLastAccepted. -func (mr *DiffMockRecorder) SetLastAccepted(blkID any) *gomock.Call { +func (mr *DiffMockRecorder) SetLastAccepted(blkID, height any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*Diff)(nil).SetLastAccepted), blkID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*Diff)(nil).SetLastAccepted), blkID, height) } // SetTimestamp mocks base method. diff --git a/vms/avm/state/statemock/state.go b/vms/avm/state/statemock/state.go index 1ab1dae38700..b91dbfb2fbe6 100644 --- a/vms/avm/state/statemock/state.go +++ b/vms/avm/state/statemock/state.go @@ -10,6 +10,7 @@ package statemock import ( + context "context" reflect "reflect" time "time" @@ -94,31 +95,32 @@ func (mr *StateMockRecorder) AddUTXO(utxo any) *gomock.Call { } // Checksum mocks base method. -func (m *State) Checksum() ids.ID { +func (m *State) Checksum(ctx context.Context) (ids.ID, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Checksum") + ret := m.ctrl.Call(m, "Checksum", ctx) ret0, _ := ret[0].(ids.ID) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // Checksum indicates an expected call of Checksum. -func (mr *StateMockRecorder) Checksum() *gomock.Call { +func (mr *StateMockRecorder) Checksum(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Checksum", reflect.TypeOf((*State)(nil).Checksum)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Checksum", reflect.TypeOf((*State)(nil).Checksum), ctx) } // Close mocks base method. -func (m *State) Close() error { +func (m *State) Close(ctx context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") + ret := m.ctrl.Call(m, "Close", ctx) ret0, _ := ret[0].(error) return ret0 } // Close indicates an expected call of Close. -func (mr *StateMockRecorder) Close() *gomock.Call { +func (mr *StateMockRecorder) Close(ctx any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*State)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*State)(nil).Close), ctx) } // Commit mocks base method. @@ -294,15 +296,15 @@ func (mr *StateMockRecorder) SetInitialized() *gomock.Call { } // SetLastAccepted mocks base method. -func (m *State) SetLastAccepted(blkID ids.ID) { +func (m *State) SetLastAccepted(blkID ids.ID, height uint64) { m.ctrl.T.Helper() - m.ctrl.Call(m, "SetLastAccepted", blkID) + m.ctrl.Call(m, "SetLastAccepted", blkID, height) } // SetLastAccepted indicates an expected call of SetLastAccepted. -func (mr *StateMockRecorder) SetLastAccepted(blkID any) *gomock.Call { +func (mr *StateMockRecorder) SetLastAccepted(blkID, height any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*State)(nil).SetLastAccepted), blkID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetLastAccepted", reflect.TypeOf((*State)(nil).SetLastAccepted), blkID, height) } // SetTimestamp mocks base method. diff --git a/vms/avm/vm.go b/vms/avm/vm.go index 35b8825530d4..eae6b5a0e9f6 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -16,7 +16,9 @@ import ( "go.uber.org/zap" "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/chains/atomic" "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" @@ -51,6 +53,7 @@ var ( errGenesisAssetMustHaveState = errors.New("genesis asset must have non-empty state") _ vertex.LinearizableVMWithEngine = (*VM)(nil) + _ state.VM = (*VM)(nil) ) type VM struct { @@ -80,12 +83,13 @@ type VM struct { // State management state state.State + // Granite state + graniteState state.State // asset id that will be used for fees feeAssetID ids.ID - baseDB database.Database - db *versiondb.Database + db *versiondb.Database typeToFxIndex map[reflect.Type]int fxs []*extensions.ParsedFx @@ -105,6 +109,15 @@ type VM struct { blockbuilder.Builder chainManager blockexecutor.Manager network *network.Network + + StateMigration state.Migration + + utxoDB database.Database + indexDB database.Database + txDB database.Database + blockIDDB database.Database + blockDB database.Database + singletonDB database.Database } func (vm *VM) Connected(ctx context.Context, nodeID ids.NodeID, version *version.Application) error { @@ -172,8 +185,6 @@ func (vm *VM) Initialize( vm.ctx = ctx vm.appSender = appSender - vm.baseDB = db - vm.db = versiondb.New(db) typedFxs := make([]extensions.Fx, len(fxs)) vm.fxs = make([]*extensions.ParsedFx, len(fxs)) @@ -206,17 +217,34 @@ func (vm *VM) Initialize( codec := vm.parser.Codec() vm.Spender = utxo.NewSpender(&vm.clock, codec) - state, err := state.New( + vm.db = versiondb.New(db) + vm.utxoDB = prefixdb.New(state.UTXOPrefix, vm.db) + vm.indexDB = prefixdb.New(state.IndexPrefix, vm.db) + vm.txDB = prefixdb.New(state.TxPrefix, vm.db) + vm.blockIDDB = prefixdb.New(state.BlockIDPrefix, vm.db) + vm.blockDB = prefixdb.New(state.BlockPrefix, vm.db) + vm.singletonDB = prefixdb.New(state.SingletonPrefix, vm.db) + chainDB := &state.NoChainDB{VersionDB: vm.db} + + vm.graniteState, err = state.NewWithFormat( + "state", + chainDB, + db, vm.db, + vm.indexDB, + vm.txDB, + vm.blockIDDB, + vm.blockDB, + vm.singletonDB, + avax.NewUTXODatabase(vm.utxoDB, vm.parser.Codec(), avmConfig.ChecksumsEnabled), vm.parser, vm.registerer, - avmConfig.ChecksumsEnabled, ) if err != nil { return err } - vm.state = state + vm.state = vm.graniteState if err := vm.initGenesis(genesisBytes); err != nil { return err @@ -272,7 +300,7 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { } } -func (vm *VM) Shutdown(context.Context) error { +func (vm *VM) Shutdown(ctx context.Context) error { if vm.state == nil { return nil } @@ -280,10 +308,7 @@ func (vm *VM) Shutdown(context.Context) error { vm.onShutdownCtxCancel() vm.awaitShutdown.Wait() - return errors.Join( - vm.state.Close(), - vm.baseDB.Close(), - ) + return vm.state.Close(ctx) } func (*VM) Version(context.Context) (string, error) { @@ -336,7 +361,7 @@ func (vm *VM) ParseBlock(_ context.Context, blkBytes []byte) (snowman.Block, err if err != nil { return nil, err } - return vm.chainManager.NewBlock(blk), nil + return vm.chainManager.NewBlock(blk, vm.ctx.SharedMemory), nil } func (vm *VM) SetPreference(_ context.Context, blkID ids.ID) error { @@ -352,6 +377,44 @@ func (vm *VM) GetBlockIDAtHeight(_ context.Context, height uint64) (ids.ID, erro return vm.state.GetBlockIDAtHeight(height) } +func (vm *VM) Replay(ctx context.Context, b block.Block) error { + if b.Height() == 0 { + // The genesis block is special-cased and cannot be verified, so add this to + // state directly. + vm.state.AddBlock(b) + vm.state.SetTimestamp(b.Timestamp()) + vm.state.SetLastAccepted(b.ID(), b.Height()) + + vm.chainManager.SetLastAccepted(b.ID()) + vm.chainManager.SetPreference(b.ID()) + + // We have to call Commit here to guarantee that a revision is created for + // genesis. + if err := vm.state.Commit(); err != nil { + return fmt.Errorf("failed to commit genesis block: %w", err) + } + + return nil + } + + blk := vm.chainManager.NewBlock( + b, + // Avoid writing to shared memory because we already wrote to it when we first + // accepted the block. + atomic.NewReadOnly(vm.ctx.SharedMemory), + ) + + if err := blk.Verify(ctx); err != nil { + return fmt.Errorf("verifying block %s: %w", blk.ID(), err) + } + + if err := blk.Accept(ctx); err != nil { + return fmt.Errorf("verifying block %s: %w", blk.ID(), err) + } + + return nil +} + /* ****************************************************************************** *********************************** DAG VM *********************************** @@ -360,10 +423,35 @@ func (vm *VM) GetBlockIDAtHeight(_ context.Context, height uint64) (ids.ID, erro func (vm *VM) Linearize(ctx context.Context, stopVertexID ids.ID) error { time := vm.Config.Upgrades.CortinaTime - if err := vm.state.InitializeChainState(stopVertexID, time); err != nil { - return fmt.Errorf("failed to initialize chain state: %w", err) + + if err := vm.graniteState.InitializeChainState( + stopVertexID, + time, + ); err != nil { + return fmt.Errorf("failed to initialize legacy chain state: %w", err) } + state, chainDB, err := vm.StateMigration.Migrate( + vm.ctx.Log, + vm.parser, + vm.registerer, + vm.state, + vm.db, + vm.utxoDB, + vm.txDB, + vm.blockIDDB, + vm.blockDB, + vm.singletonDB, + vm.ctx.ChainDataDir, + stopVertexID, + time, + ) + if err != nil { + return fmt.Errorf("failed to migrate state: %w", err) + } + + vm.state = state + mempool, err := xmempool.New("mempool", vm.registerer) if err != nil { return fmt.Errorf("failed to create mempool: %w", err) @@ -378,6 +466,10 @@ func (vm *VM) Linearize(ctx context.Context, stopVertexID ids.ID) error { vm.onAccept, ) + if err := chainDB.Repair(ctx, vm, vm.state); err != nil { + return fmt.Errorf("repairing chain state: %w", err) + } + vm.Builder = blockbuilder.New( vm.txBackend, vm.chainManager, @@ -455,6 +547,14 @@ func (vm *VM) ParseTx(_ context.Context, bytes []byte) (snowstorm.Tx, error) { }, nil } +func (vm *VM) GetTx(txID ids.ID) (*txs.Tx, error) { + return vm.state.GetTx(txID) +} + +func (vm *VM) GetUTXO(utxoID ids.ID) (*avax.UTXO, error) { + return vm.state.GetUTXO(utxoID) +} + /* ****************************************************************************** ********************************** JSON API ********************************** diff --git a/vms/avm/vm_test.go b/vms/avm/vm_test.go index bd0600e78a84..b87d43d3d810 100644 --- a/vms/avm/vm_test.go +++ b/vms/avm/vm_test.go @@ -4,6 +4,7 @@ package avm import ( + "encoding/json" "math" "testing" @@ -14,12 +15,17 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/upgrade/upgradetest" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" + "github.com/ava-labs/avalanchego/vms/avm/config" + "github.com/ava-labs/avalanchego/vms/avm/state" "github.com/ava-labs/avalanchego/vms/avm/txs" + "github.com/ava-labs/avalanchego/vms/avm/txs/txstest" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/components/verify" "github.com/ava-labs/avalanchego/vms/nftfx" @@ -30,7 +36,7 @@ import ( func TestInvalidGenesis(t *testing.T) { require := require.New(t) - vm := &VM{} + vm := &VM{StateMigration: state.NoMigration{}} ctx := snowtest.Context(t, snowtest.XChainID) ctx.Lock.Lock() defer ctx.Lock.Unlock() @@ -702,3 +708,270 @@ func TestClearForceAcceptedExportTx(t *testing.T) { _, err = peerSharedMemory.Get(env.vm.ctx.ChainID, [][]byte{utxoID[:]}) require.ErrorIs(err, database.ErrNotFound) } + +// Tests that VM.Linearize migrates existing state to merkle-ized state +func TestVMLinearizeStateMigration(t *testing.T) { + require := require.New(t) + + db := memdb.New() + + snowCtx := snowtest.Context(t, snowtest.XChainID) + snowCtx.SharedMemory = atomic.NewMemory(db).NewSharedMemory(ids.Empty) + + configBytes, err := json.Marshal(DefaultConfig) + require.NoError(err) + + fxs := []*common.Fx{{ID: secp256k1fx.ID, Fx: &secp256k1fx.Fx{}}} + + config := config.Config{ + Upgrades: upgradetest.GetConfig(upgradetest.Latest), + } + + vm := &VM{ + Config: config, + StateMigration: state.NoMigration{}, + } + + genesisData := makeDefaultGenesisData(t) + genesis, err := NewGenesis( + constants.UnitTestID, + genesisData, + ) + require.NoError(err) + genesisBytes, err := genesis.Bytes() + require.NoError(err) + + // Start the VM and create some initial state that needs to be migrated + require.NoError(vm.Initialize( + t.Context(), + snowCtx, + db, + genesisBytes, + nil, + configBytes, + fxs, + &enginetest.Sender{}, + )) + require.NoError(vm.SetState(t.Context(), snow.Bootstrapping)) + + genesisTx := getCreateTxFromGenesisTest(t, genesisBytes, genesis.Txs[0].Name) + + txBuilder := txstest.New( + vm.parser.Codec(), + vm.ctx, + &vm.Config, + vm.ctx.AVAXAssetID, + vm.state, + ) + + key := keys[0] + kc := secp256k1fx.NewKeychain(key) + + firstTx, err := txBuilder.BaseTx( + []*avax.TransferableOutput{ + { + Asset: avax.Asset{ID: genesisTx.ID()}, + Out: &secp256k1fx.TransferOutput{ + Amt: 1, + OutputOwners: secp256k1fx.OutputOwners{ + Threshold: 1, + Addrs: []ids.ShortID{key.PublicKey().Address()}, + }, + }, + }, + }, + nil, + kc, + key.Address(), + ) + require.NoError(err) + + stopVtx, err := vm.ParseTx(t.Context(), firstTx.Bytes()) + require.NoError(err) + require.NoError(stopVtx.Verify(t.Context())) + require.NoError(stopVtx.Accept(t.Context())) + + require.NoError(vm.Linearize(t.Context(), stopVtx.ID())) + require.NoError(vm.SetState(t.Context(), snow.NormalOp)) + + genesisBlkID, err := vm.LastAccepted(t.Context()) + require.NoError(err) + + var ( + wantBlkIDs = []ids.ID{genesisBlkID} + wantTxIDs []ids.ID + wantConsumedUTXOs []*avax.UTXO + wantProducedUTXOs []*avax.UTXO + ) + + // Create an asset and export it into shared memory + tx, err := txBuilder.CreateAssetTx( + "FOO", + "BAR", + 0, + map[uint32][]verify.State{ + 0: { + &secp256k1fx.TransferOutput{ + Amt: 100, + OutputOwners: secp256k1fx.OutputOwners{ + Threshold: 1, + Addrs: []ids.ShortID{keys[0].Address()}, + }, + }, + }, + }, + kc, + keys[0].Address(), + ) + require.NoError(err) + blkID := issueAndAccept(require, vm, tx) + + wantTxIDs = append(wantTxIDs, tx.ID()) + wantConsumedUTXOs = append(wantConsumedUTXOs, tx.UTXOs()...) + + wantBlkIDs = append(wantBlkIDs, blkID) + + atomicTx, err := txBuilder.ExportTx( + snowtest.PChainID, + ids.GenerateTestShortID(), + tx.ID(), + 1, + kc, + keys[0].Address(), + ) + require.NoError(err) + blkID = issueAndAccept(require, vm, atomicTx) + + wantTxIDs = append(wantTxIDs, atomicTx.ID()) + wantProducedUTXOs = append(wantProducedUTXOs, atomicTx.UTXOs()...) + + wantBlkIDs = append(wantBlkIDs, blkID) + + // Genesis block + 2 blocks that were built + require.Len(wantBlkIDs, 3) + + db, err = memdb.Copy(db) + require.NoError(err) + require.NoError(vm.Shutdown(t.Context())) + + // Restart the VM with the migration enabled and verify that all state is + // migrated into the new database format + snowCtx = snowtest.Context(t, snowtest.XChainID) + snowCtx.SharedMemory = atomic.NewMemory(db).NewSharedMemory(ids.Empty) + + vm = &VM{ + Config: config, + StateMigration: &state.FirewoodMigration{CommitFrequency: 1}, + } + + require.NoError(vm.Initialize( + t.Context(), + snowCtx, + db, + genesisBytes, + nil, + configBytes, + fxs, + &enginetest.Sender{}, + )) + require.NoError(vm.SetState(t.Context(), snow.Bootstrapping)) + require.NoError(vm.Linearize(t.Context(), stopVtx.ID())) + require.NoError(vm.SetState(t.Context(), snow.NormalOp)) + + // Check that all previous state still exists + for height, wantBlkID := range wantBlkIDs { + gotBlkID, err := vm.GetBlockIDAtHeight(t.Context(), uint64(height)) + require.NoError(err) + require.Equal(wantBlkID, gotBlkID) + + gotBlk, err := vm.GetBlock(t.Context(), wantBlkID) + require.NoError(err) + require.Equal(wantBlkID, gotBlk.ID()) + } + + for _, txID := range wantTxIDs { + gotTx, err := vm.GetTx(txID) + require.NoError(err) + require.Equal(txID, gotTx.ID()) + } + + for _, utxo := range wantConsumedUTXOs { + _, err := vm.GetUTXO(utxo.InputID()) + // Consumed UTXOs should be removed from state + require.ErrorIs(err, database.ErrNotFound) + } + + for _, utxo := range wantProducedUTXOs { + gotUTXO, err := vm.GetUTXO(utxo.InputID()) + require.NoError(err) + require.Equal(utxo.InputID(), gotUTXO.InputID()) + } + + wantLastAcceptedBlkID := wantBlkIDs[len(wantBlkIDs)-1] + gotLastAcceptedBlkID, err := vm.LastAccepted(t.Context()) + require.NoError(err) + require.Equal(wantLastAcceptedBlkID, gotLastAcceptedBlkID) + + db, err = memdb.Copy(db) + require.NoError(err) + require.NoError(vm.Shutdown(t.Context())) + + // Restart the VM using the old state format and verify that all the previous + // data was deleted + snowCtx = snowtest.Context(t, snowtest.XChainID) + snowCtx.SharedMemory = atomic.NewMemory(db).NewSharedMemory(ids.Empty) + + vm = &VM{ + Config: config, + StateMigration: state.NoMigration{}, + } + require.NoError(vm.Initialize( + t.Context(), + snowCtx, + db, + genesisBytes, + nil, + configBytes, + fxs, + &enginetest.Sender{}, + )) + require.NoError(vm.SetState(t.Context(), snow.Bootstrapping)) + require.NoError(vm.Linearize(t.Context(), stopVtx.ID())) + require.NoError(vm.SetState(t.Context(), snow.NormalOp)) + + // The genesis block should always be present because it is initialized as + // part of the VM initial state. + _, err = vm.GetBlock(t.Context(), genesisBlkID) + require.NoError(err) + _, err = vm.GetBlockIDAtHeight(t.Context(), 0) + require.NoError(err) + + // Any blocks after genesis should be deleted because they exist in the new + // state format. + for height, wantBlkID := range wantBlkIDs { + if height == 0 { + continue + } + + _, err := vm.GetBlock(t.Context(), wantBlkID) + require.ErrorIs(err, database.ErrNotFound) + + _, err = vm.GetBlockIDAtHeight(t.Context(), uint64(height)) + require.ErrorIs(err, database.ErrNotFound) + } + + for _, txID := range wantTxIDs { + _, err := vm.GetTx(txID) + require.ErrorIs(err, database.ErrNotFound) + } + + for _, utxo := range wantConsumedUTXOs { + _, err := vm.GetUTXO(utxo.InputID()) + require.ErrorIs(err, database.ErrNotFound) + } + + for _, utxo := range wantProducedUTXOs { + _, err := vm.GetUTXO(utxo.InputID()) + require.ErrorIs(err, database.ErrNotFound) + } +} diff --git a/vms/components/avax/firewood.go b/vms/components/avax/firewood.go new file mode 100644 index 000000000000..615276755f2e --- /dev/null +++ b/vms/components/avax/firewood.go @@ -0,0 +1,74 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package avax + +import ( + "github.com/ava-labs/avalanchego/codec" + "github.com/ava-labs/avalanchego/firewood" + "github.com/ava-labs/avalanchego/ids" +) + +var utxoPrefix = []byte("utxo") + +type FirewoodUTXODB struct { + db *firewood.DB + codec codec.Manager +} + +var _ UTXODB = (*FirewoodUTXODB)(nil) + +func NewFirewoodUTXODB(db *firewood.DB, codec codec.Manager) *FirewoodUTXODB { + return &FirewoodUTXODB{ + db: db, + codec: codec, + } +} + +func (f *FirewoodUTXODB) Get(inputID ids.ID) (*UTXO, error) { + bytes, err := f.db.Get(firewood.Prefix(utxoPrefix, inputID[:])) + if err != nil { + return nil, err + } + + u := &UTXO{} + if _, err := f.codec.Unmarshal(bytes, u); err != nil { + return nil, err + } + + return u, nil +} + +func (f *FirewoodUTXODB) Put(u *UTXO) error { + bytes, err := f.codec.Marshal(codecVersion, u) + if err != nil { + return err + } + + inputID := u.InputID() + f.db.Put(firewood.Prefix(utxoPrefix, inputID[:]), bytes) + return nil +} + +func (f *FirewoodUTXODB) Delete(key []byte) error { + f.db.Delete(firewood.Prefix(utxoPrefix, key)) + return nil +} + +// InitChecksum is a no-op because firewood already initializes the merkle root. +func (*FirewoodUTXODB) InitChecksum() error { + return nil +} + +// UpdateChecksum is a no-op because firewood already updates the merkle root. +func (*FirewoodUTXODB) UpdateChecksum(ids.ID) {} + +func (f *FirewoodUTXODB) Checksum() (ids.ID, error) { + return f.db.Root() +} + +// Close is a no-op because this is a subset of firewoodDB which performs +// Close. +func (*FirewoodUTXODB) Close() error { + return nil +} diff --git a/vms/components/avax/utxo_fetching_test.go b/vms/components/avax/utxo_fetching_test.go index 162e4caa7494..842b59815ad2 100644 --- a/vms/components/avax/utxo_fetching_test.go +++ b/vms/components/avax/utxo_fetching_test.go @@ -11,6 +11,7 @@ import ( "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/codec/linearcodec" "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/vms/secp256k1fx" @@ -45,8 +46,12 @@ func TestFetchUTXOs(t *testing.T) { require.NoError(c.RegisterType(&secp256k1fx.TransferOutput{})) require.NoError(manager.RegisterCodec(codecVersion, c)) - db := memdb.New() - s, err := NewUTXOState(db, manager, trackChecksum) + s, err := NewUTXOState( + memdb.New(), + memdb.New(), + manager, + trackChecksum, + ) require.NoError(err) require.NoError(s.PutUTXO(utxo)) @@ -79,7 +84,12 @@ func TestGetPaginatedUTXOs(t *testing.T) { require.NoError(manager.RegisterCodec(codecVersion, c)) db := memdb.New() - s, err := NewUTXOState(db, manager, trackChecksum) + s, err := NewUTXOState( + prefixdb.New([]byte("foo"), db), + prefixdb.New([]byte("bar"), db), + manager, + trackChecksum, + ) require.NoError(err) // Create 1000 UTXOs each on addr0, addr1, and addr2. diff --git a/vms/components/avax/utxo_state.go b/vms/components/avax/utxo_state.go index 0f36215eb718..b645ee66bd8b 100644 --- a/vms/components/avax/utxo_state.go +++ b/vms/components/avax/utxo_state.go @@ -4,6 +4,8 @@ package avax import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" "github.com/ava-labs/avalanchego/cache" @@ -14,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/database/linkeddb" "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/metric" ) const ( @@ -21,11 +24,6 @@ const ( indexCacheSize = 64 ) -var ( - utxoPrefix = []byte("utxo") - indexPrefix = []byte("index") -) - // UTXOState is a thin wrapper around a database to provide, caching, // serialization, and de-serialization for UTXOs. type UTXOState interface { @@ -33,7 +31,8 @@ type UTXOState interface { UTXOWriter // Checksum returns the current UTXOChecksum. - Checksum() ids.ID + Checksum() (ids.ID, error) + Close() error } // UTXOReader is a thin wrapper around a database to provide fetching of UTXOs. @@ -71,47 +70,50 @@ type UTXOWriter interface { DeleteUTXO(utxoID ids.ID) error } -type utxoState struct { - codec codec.Manager +// UTXODB is the database where utxos are persisted +type UTXODB interface { + Get(inputID ids.ID) (*UTXO, error) + Put(u *UTXO) error + Delete(key []byte) error + InitChecksum() error + UpdateChecksum(utxoID ids.ID) + Checksum() (ids.ID, error) + Close() error +} +type utxoState struct { // UTXO ID -> *UTXO. If the *UTXO is nil the UTXO doesn't exist utxoCache cache.Cacher[ids.ID, *UTXO] - utxoDB database.Database + utxoDB UTXODB indexDB database.Database indexCache cache.Cacher[string, linkeddb.LinkedDB] - - trackChecksum bool - checksum ids.ID } func NewUTXOState( - db database.Database, + utxoDB database.Database, + indexDB database.Database, codec codec.Manager, trackChecksum bool, ) (UTXOState, error) { s := &utxoState{ - codec: codec, - utxoCache: lru.NewCache[ids.ID, *UTXO](utxoCacheSize), - utxoDB: prefixdb.New(utxoPrefix, db), + utxoDB: NewUTXODatabase(utxoDB, codec, trackChecksum), - indexDB: prefixdb.New(indexPrefix, db), + indexDB: indexDB, indexCache: lru.NewCache[string, linkeddb.LinkedDB](indexCacheSize), - - trackChecksum: trackChecksum, } - return s, s.initChecksum() + return s, s.utxoDB.InitChecksum() } func NewMeteredUTXOState( - db database.Database, - codec codec.Manager, + namespace string, + utxoDB UTXODB, + indexDB database.Database, metrics prometheus.Registerer, - trackChecksum bool, ) (UTXOState, error) { utxoCache, err := metercacher.New[ids.ID, *UTXO]( - "utxo_cache", + metric.AppendNamespace(namespace, "utxo_cache"), metrics, lru.NewCache[ids.ID, *UTXO](utxoCacheSize), ) @@ -120,7 +122,7 @@ func NewMeteredUTXOState( } indexCache, err := metercacher.New[string, linkeddb.LinkedDB]( - "index_cache", + metric.AppendNamespace(namespace, "index_cache"), metrics, lru.NewCache[string, linkeddb.LinkedDB](indexCacheSize), ) @@ -129,17 +131,13 @@ func NewMeteredUTXOState( } s := &utxoState{ - codec: codec, - utxoCache: utxoCache, - utxoDB: prefixdb.New(utxoPrefix, db), + utxoDB: utxoDB, - indexDB: prefixdb.New(indexPrefix, db), + indexDB: indexDB, indexCache: indexCache, - - trackChecksum: trackChecksum, } - return s, s.initChecksum() + return s, utxoDB.InitChecksum() } func (s *utxoState) GetUTXO(utxoID ids.ID) (*UTXO, error) { @@ -150,7 +148,7 @@ func (s *utxoState) GetUTXO(utxoID ids.ID) (*UTXO, error) { return utxo, nil } - bytes, err := s.utxoDB.Get(utxoID[:]) + utxo, err := s.utxoDB.Get(utxoID) if err == database.ErrNotFound { s.utxoCache.Put(utxoID, nil) return nil, database.ErrNotFound @@ -159,27 +157,16 @@ func (s *utxoState) GetUTXO(utxoID ids.ID) (*UTXO, error) { return nil, err } - // The key was in the database - utxo := &UTXO{} - if _, err := s.codec.Unmarshal(bytes, utxo); err != nil { - return nil, err - } - s.utxoCache.Put(utxoID, utxo) return utxo, nil } func (s *utxoState) PutUTXO(utxo *UTXO) error { - utxoBytes, err := s.codec.Marshal(codecVersion, utxo) - if err != nil { - return err - } - utxoID := utxo.InputID() - s.updateChecksum(utxoID) + s.utxoDB.UpdateChecksum(utxoID) s.utxoCache.Put(utxoID, utxo) - if err := s.utxoDB.Put(utxoID[:], utxoBytes); err != nil { + if err := s.utxoDB.Put(utxo); err != nil { return err } @@ -207,7 +194,7 @@ func (s *utxoState) DeleteUTXO(utxoID ids.ID) error { return err } - s.updateChecksum(utxoID) + s.utxoDB.UpdateChecksum(utxoID) s.utxoCache.Put(utxoID, nil) if err := s.utxoDB.Delete(utxoID[:]); err != nil { @@ -250,8 +237,8 @@ func (s *utxoState) UTXOIDs(addr []byte, start ids.ID, limit int) ([]ids.ID, err return utxoIDs, iter.Error() } -func (s *utxoState) Checksum() ids.ID { - return s.checksum +func (s *utxoState) Checksum() (ids.ID, error) { + return s.utxoDB.Checksum() } func (s *utxoState) getIndexDB(addr []byte) linkeddb.LinkedDB { @@ -266,12 +253,74 @@ func (s *utxoState) getIndexDB(addr []byte) linkeddb.LinkedDB { return indexList } -func (s *utxoState) initChecksum() error { - if !s.trackChecksum { +func (s *utxoState) Close() error { + if err := s.utxoDB.Close(); err != nil { + return fmt.Errorf("closing utxo db: %w", err) + } + + if err := s.indexDB.Close(); err != nil { + return fmt.Errorf("closing index db: %w", err) + } + + return nil +} + +// Deprecated: [FirewoodUTXODB] should be used after the firewood migration. +type UTXODatabase struct { + db database.Database + codec codec.Manager + trackChecksum bool + checksum ids.ID +} + +var _ UTXODB = (*UTXODatabase)(nil) + +func NewUTXODatabase( + db database.Database, + c codec.Manager, + trackChecksum bool, +) *UTXODatabase { + return &UTXODatabase{ + db: db, + codec: c, + trackChecksum: trackChecksum, + } +} + +func (u *UTXODatabase) Get(inputID ids.ID) (*UTXO, error) { + bytes, err := u.db.Get(inputID[:]) + if err != nil { + return nil, err + } + + utxo := &UTXO{} + if _, err := u.codec.Unmarshal(bytes, utxo); err != nil { + return nil, err + } + + return utxo, nil +} + +func (u *UTXODatabase) Put(utxo *UTXO) error { + utxoBytes, err := u.codec.Marshal(codecVersion, utxo) + if err != nil { + return err + } + + inputID := utxo.InputID() + return u.db.Put(inputID[:], utxoBytes) +} + +func (u *UTXODatabase) Delete(key []byte) error { + return u.db.Delete(key) +} + +func (u *UTXODatabase) InitChecksum() error { + if !u.trackChecksum { return nil } - it := s.utxoDB.NewIterator() + it := u.db.NewIterator() defer it.Release() for it.Next() { @@ -279,15 +328,23 @@ func (s *utxoState) initChecksum() error { if err != nil { return err } - s.updateChecksum(utxoID) + u.UpdateChecksum(utxoID) } return it.Error() } -func (s *utxoState) updateChecksum(modifiedID ids.ID) { - if !s.trackChecksum { +func (u *UTXODatabase) UpdateChecksum(modifiedID ids.ID) { + if !u.trackChecksum { return } - s.checksum = s.checksum.XOR(modifiedID) + u.checksum = u.checksum.XOR(modifiedID) +} + +func (u *UTXODatabase) Checksum() (ids.ID, error) { + return u.checksum, nil +} + +func (u *UTXODatabase) Close() error { + return u.db.Close() } diff --git a/vms/components/avax/utxo_state_test.go b/vms/components/avax/utxo_state_test.go index d0bc13310af3..cd31a69bc67a 100644 --- a/vms/components/avax/utxo_state_test.go +++ b/vms/components/avax/utxo_state_test.go @@ -12,6 +12,7 @@ import ( "github.com/ava-labs/avalanchego/codec/linearcodec" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/memdb" + "github.com/ava-labs/avalanchego/database/prefixdb" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/vms/secp256k1fx" ) @@ -52,7 +53,11 @@ func TestUTXOState(t *testing.T) { require.NoError(manager.RegisterCodec(codecVersion, c)) db := memdb.New() - s, err := NewUTXOState(db, manager, trackChecksum) + s, err := NewUTXOState( + prefixdb.New([]byte("foo"), db), + prefixdb.New([]byte("bar"), db), + manager, + trackChecksum) require.NoError(err) _, err = s.GetUTXO(utxoID) @@ -80,7 +85,12 @@ func TestUTXOState(t *testing.T) { require.NoError(s.PutUTXO(utxo)) - s, err = NewUTXOState(db, manager, trackChecksum) + s, err = NewUTXOState( + prefixdb.New([]byte("foo"), db), + prefixdb.New([]byte("bar"), db), + manager, + trackChecksum, + ) require.NoError(err) readUTXO, err = s.GetUTXO(utxoID) diff --git a/vms/platformvm/block/executor/acceptor.go b/vms/platformvm/block/executor/acceptor.go index 68af5a97a6ef..aa3d134976b6 100644 --- a/vms/platformvm/block/executor/acceptor.go +++ b/vms/platformvm/block/executor/acceptor.go @@ -39,8 +39,7 @@ func (a *acceptor) BanffCommitBlock(b *block.BanffCommitBlock) error { } func (a *acceptor) BanffProposalBlock(b *block.BanffProposalBlock) error { - a.proposalBlock(b, "banff proposal") - return nil + return a.proposalBlock(b, "banff proposal") } func (a *acceptor) BanffStandardBlock(b *block.BanffStandardBlock) error { @@ -56,8 +55,7 @@ func (a *acceptor) ApricotCommitBlock(b *block.ApricotCommitBlock) error { } func (a *acceptor) ApricotProposalBlock(b *block.ApricotProposalBlock) error { - a.proposalBlock(b, "apricot proposal") - return nil + return a.proposalBlock(b, "apricot proposal") } func (a *acceptor) ApricotStandardBlock(b *block.ApricotStandardBlock) error { @@ -102,13 +100,18 @@ func (a *acceptor) ApricotAtomicBlock(b *block.ApricotAtomicBlock) error { ) } + checksum, err := a.state.Checksum() + if err != nil { + return fmt.Errorf("getting checksum: %w", err) + } + a.ctx.Log.Trace( "accepted block", zap.String("blockType", "apricot atomic"), zap.Stringer("blkID", blkID), zap.Uint64("height", b.Height()), zap.Stringer("parentID", b.Parent()), - zap.Stringer("checksum", a.state.Checksum()), + zap.Stringer("checksum", checksum), ) return nil @@ -172,19 +175,24 @@ func (a *acceptor) optionBlock(b block.Block, blockType string) error { onAcceptFunc() } + checksum, err := a.state.Checksum() + if err != nil { + return fmt.Errorf("getting checksum: %w", err) + } + a.ctx.Log.Trace( "accepted block", zap.String("blockType", blockType), zap.Stringer("blkID", blkID), zap.Uint64("height", b.Height()), zap.Stringer("parentID", parentID), - zap.Stringer("checksum", a.state.Checksum()), + zap.Stringer("checksum", checksum), ) return nil } -func (a *acceptor) proposalBlock(b block.Block, blockType string) { +func (a *acceptor) proposalBlock(b block.Block, blockType string) error { // Note that: // // * We don't free the proposal block in this method. @@ -204,14 +212,21 @@ func (a *acceptor) proposalBlock(b block.Block, blockType string) { blkID := b.ID() a.backend.lastAccepted = blkID + checksum, err := a.state.Checksum() + if err != nil { + return fmt.Errorf("getting checksum: %w", err) + } + a.ctx.Log.Trace( "accepted block", zap.String("blockType", blockType), zap.Stringer("blkID", blkID), zap.Uint64("height", b.Height()), zap.Stringer("parentID", b.Parent()), - zap.Stringer("checksum", a.state.Checksum()), + zap.Stringer("checksum", checksum), ) + + return nil } func (a *acceptor) standardBlock(b block.Block, blockType string) error { @@ -251,13 +266,18 @@ func (a *acceptor) standardBlock(b block.Block, blockType string) error { onAcceptFunc() } + checksum, err := a.state.Checksum() + if err != nil { + return fmt.Errorf("getting checksum: %w", err) + } + a.ctx.Log.Trace( "accepted block", zap.String("blockType", blockType), zap.Stringer("blkID", blkID), zap.Uint64("height", b.Height()), zap.Stringer("parentID", b.Parent()), - zap.Stringer("checksum", a.state.Checksum()), + zap.Stringer("checksum", checksum), ) return nil diff --git a/vms/platformvm/block/executor/acceptor_test.go b/vms/platformvm/block/executor/acceptor_test.go index 3d81d358a066..1a216ec277b2 100644 --- a/vms/platformvm/block/executor/acceptor_test.go +++ b/vms/platformvm/block/executor/acceptor_test.go @@ -48,7 +48,7 @@ func TestAcceptorVisitProposalBlock(t *testing.T) { blkID := blk.ID() s := state.NewMockState(ctrl) - s.EXPECT().Checksum().Return(ids.Empty).Times(1) + s.EXPECT().Checksum().Return(ids.Empty, nil).Times(1) acceptor := &acceptor{ backend: &backend{ @@ -149,7 +149,7 @@ func TestAcceptorVisitAtomicBlock(t *testing.T) { s.EXPECT().Abort().Times(1) onAcceptState.EXPECT().Apply(s).Times(1) sharedMemory.EXPECT().Apply(atomicRequests, batch).Return(nil).Times(1) - s.EXPECT().Checksum().Return(ids.Empty).Times(1) + s.EXPECT().Checksum().Return(ids.Empty, nil).Times(1) require.NoError(acceptor.ApricotAtomicBlock(blk)) } @@ -235,7 +235,7 @@ func TestAcceptorVisitStandardBlock(t *testing.T) { s.EXPECT().Abort().Times(1) onAcceptState.EXPECT().Apply(s).Times(1) sharedMemory.EXPECT().Apply(atomicRequests, batch).Return(nil).Times(1) - s.EXPECT().Checksum().Return(ids.Empty).Times(1) + s.EXPECT().Checksum().Return(ids.Empty, nil).Times(1) require.NoError(acceptor.BanffStandardBlock(blk)) require.True(calledOnAcceptFunc) @@ -342,7 +342,7 @@ func TestAcceptorVisitCommitBlock(t *testing.T) { parentOnCommitState.EXPECT().Apply(s).Times(1), s.EXPECT().CommitBatch().Return(batch, nil).Times(1), sharedMemory.EXPECT().Apply(atomicRequests, batch).Return(nil).Times(1), - s.EXPECT().Checksum().Return(ids.Empty).Times(1), + s.EXPECT().Checksum().Return(ids.Empty, nil).Times(1), s.EXPECT().Abort().Times(1), ) @@ -451,7 +451,7 @@ func TestAcceptorVisitAbortBlock(t *testing.T) { parentOnAbortState.EXPECT().Apply(s).Times(1), s.EXPECT().CommitBatch().Return(batch, nil).Times(1), sharedMemory.EXPECT().Apply(atomicRequests, batch).Return(nil).Times(1), - s.EXPECT().Checksum().Return(ids.Empty).Times(1), + s.EXPECT().Checksum().Return(ids.Empty, nil).Times(1), s.EXPECT().Abort().Times(1), ) diff --git a/vms/platformvm/state/mock_state.go b/vms/platformvm/state/mock_state.go index ed7a447d63c9..2be87d11b99f 100644 --- a/vms/platformvm/state/mock_state.go +++ b/vms/platformvm/state/mock_state.go @@ -206,11 +206,12 @@ func (mr *MockStateMockRecorder) ApplyValidatorWeightDiffs(ctx, validators, star } // Checksum mocks base method. -func (m *MockState) Checksum() ids.ID { +func (m *MockState) Checksum() (ids.ID, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Checksum") ret0, _ := ret[0].(ids.ID) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // Checksum indicates an expected call of Checksum. diff --git a/vms/platformvm/state/state.go b/vms/platformvm/state/state.go index a12d472b6c47..3233c84606a3 100644 --- a/vms/platformvm/state/state.go +++ b/vms/platformvm/state/state.go @@ -96,6 +96,7 @@ var ( ActivePrefix = []byte("active") InactivePrefix = []byte("inactive") SingletonPrefix = []byte("singleton") + indexPrefix = []byte("index") TimestampKey = []byte("timestamp") FeeStateKey = []byte("fee state") @@ -275,7 +276,7 @@ type State interface { // pending changes to the base database. CommitBatch() (database.Batch, error) - Checksum() ids.ID + Checksum() (ids.ID, error) Close() error } @@ -686,7 +687,16 @@ func New( } utxoDB := prefixdb.New(UTXOPrefix, baseDB) - utxoState, err := avax.NewMeteredUTXOState(utxoDB, txs.GenesisCodec, metricsReg, execCfg.ChecksumsEnabled) + utxoState, err := avax.NewMeteredUTXOState( + "", + avax.NewUTXODatabase( + prefixdb.New(UTXOPrefix, utxoDB), + txs.GenesisCodec, + execCfg.ChecksumsEnabled, + ), + prefixdb.New(indexPrefix, utxoDB), + metricsReg, + ) if err != nil { return nil, err } @@ -2384,7 +2394,7 @@ func (s *state) Abort() { s.baseDB.Abort() } -func (s *state) Checksum() ids.ID { +func (s *state) Checksum() (ids.ID, error) { return s.utxoState.Checksum() }