Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/db"
Expand Down Expand Up @@ -286,7 +287,7 @@ func prepareNormalizedBalancesV5(bals []encodedBalanceRecordV5, proto config.Con
return nil, err
}
normalizedAccountBalances[i].AccountHashes = make([][]byte, 1)
normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilder(balance.Address, accountDataV5, balance.AccountData)
normalizedAccountBalances[i].AccountHashes[0] = store.AccountHashBuilder(balance.Address, accountDataV5, balance.AccountData)
if len(resources) > 0 {
normalizedAccountBalances[i].Resources = make(map[basics.CreatableIndex]store.ResourcesData, len(resources))
normalizedAccountBalances[i].EncodedResources = make(map[basics.CreatableIndex][]byte, len(resources))
Expand Down Expand Up @@ -325,7 +326,7 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con
normalizedAccountBalances[i].PartialBalance = true
} else {
normalizedAccountBalances[i].AccountHashes = make([][]byte, 1+len(balance.Resources))
normalizedAccountBalances[i].AccountHashes[0] = accountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData)
normalizedAccountBalances[i].AccountHashes[0] = store.AccountHashBuilderV6(balance.Address, &normalizedAccountBalances[i].AccountData, balance.AccountData)
curHashIdx++
}
if len(balance.Resources) > 0 {
Expand All @@ -337,7 +338,7 @@ func prepareNormalizedBalancesV6(bals []encodedBalanceRecordV6, proto config.Con
if err != nil {
return nil, err
}
normalizedAccountBalances[i].AccountHashes[curHashIdx], err = resourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res)
normalizedAccountBalances[i].AccountHashes[curHashIdx], err = store.ResourcesHashBuilderV6(&resData, balance.Address, basics.CreatableIndex(cidx), resData.UpdateRound, res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1279,11 +1280,11 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc
// when migrating there is only MaxTxnLife blocks in the block DB
// since the original txTail.commmittedUpTo preserved only (rnd+1)-MaxTxnLife = 1000 blocks back
err = blockDb.Atomic(func(ctx context.Context, blockTx *sql.Tx) error {
latestBlockRound, err := blockLatest(blockTx)
latestBlockRound, err := blockdb.BlockLatest(blockTx)
if err != nil {
return fmt.Errorf("latest block number cannot be retrieved : %w", err)
}
latestHdr, err := blockGetHdr(blockTx, dbRound)
latestHdr, err := blockdb.BlockGetHdr(blockTx, dbRound)
if err != nil {
return fmt.Errorf("latest block header %d cannot be retrieved : %w", dbRound, err)
}
Expand All @@ -1299,7 +1300,7 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc
if firstRound == basics.Round(0) {
firstRound++
}
if _, err := blockGet(blockTx, firstRound); err != nil {
if _, err := blockdb.BlockGet(blockTx, firstRound); err != nil {
// looks like not catchpoint but a regular migration, start from maxTxnLife + deeperBlockHistory back
firstRound = (latestBlockRound + 1).SubSaturate(maxTxnLife + deeperBlockHistory)
if firstRound == basics.Round(0) {
Expand All @@ -1308,7 +1309,7 @@ func performTxTailTableMigration(ctx context.Context, tx *sql.Tx, blockDb db.Acc
}
tailRounds := make([][]byte, 0, maxTxnLife)
for rnd := firstRound; rnd <= dbRound; rnd++ {
blk, err := blockGet(blockTx, rnd)
blk, err := blockdb.BlockGet(blockTx, rnd)
if err != nil {
return fmt.Errorf("block for round %d ( %d - %d ) cannot be retrieved : %w", rnd, firstRound, dbRound, err)
}
Expand Down Expand Up @@ -1343,7 +1344,7 @@ func performOnlineRoundParamsTailMigration(ctx context.Context, tx *sql.Tx, bloc
currentProto = initProto
} else {
err = blockDb.Atomic(func(ctx context.Context, blockTx *sql.Tx) error {
hdr, err := blockGetHdr(blockTx, rnd)
hdr, err := blockdb.BlockGetHdr(blockTx, rnd)
if err != nil {
return err
}
Expand Down Expand Up @@ -1514,7 +1515,7 @@ func performOnlineAccountsTableMigration(ctx context.Context, tx *sql.Tx, progre
return fmt.Errorf("accountsInitialize was unable to MakeTrie: %v", err)
}
for addr, state := range acctRehash {
deleteHash := accountHashBuilderV6(addr, &state.old, state.oldEnc)
deleteHash := store.AccountHashBuilderV6(addr, &state.old, state.oldEnc)
deleted, err := trie.Delete(deleteHash)
if err != nil {
return fmt.Errorf("performOnlineAccountsTableMigration failed to delete hash '%s' from merkle trie for account %v: %w", hex.EncodeToString(deleteHash), addr, err)
Expand All @@ -1523,7 +1524,7 @@ func performOnlineAccountsTableMigration(ctx context.Context, tx *sql.Tx, progre
log.Warnf("performOnlineAccountsTableMigration failed to delete hash '%s' from merkle trie for account %v", hex.EncodeToString(deleteHash), addr)
}

addHash := accountHashBuilderV6(addr, &state.new, state.newEnc)
addHash := store.AccountHashBuilderV6(addr, &state.new, state.newEnc)
added, err := trie.Add(addHash)
if err != nil {
return fmt.Errorf("performOnlineAccountsTableMigration attempted to add duplicate hash '%s' to merkle trie for account %v: %w", hex.EncodeToString(addHash), addr, err)
Expand Down Expand Up @@ -2466,7 +2467,7 @@ func (iterator *orderedAccountsIter) Next(ctx context.Context) (acct []accountAd
if iterator.step == oaiStepInsertAccountData {
var lastAddrID int64
baseCb := func(addr basics.Address, rowid int64, accountData *store.BaseAccountData, encodedAccountData []byte) (err error) {
hash := accountHashBuilderV6(addr, accountData, encodedAccountData)
hash := store.AccountHashBuilderV6(addr, accountData, encodedAccountData)
_, err = iterator.insertStmt.ExecContext(ctx, rowid, hash)
if err != nil {
return
Expand All @@ -2477,7 +2478,7 @@ func (iterator *orderedAccountsIter) Next(ctx context.Context) (acct []accountAd

resCb := func(addr basics.Address, cidx basics.CreatableIndex, resData *store.ResourcesData, encodedResourceData []byte, lastResource bool) error {
if resData != nil {
hash, err := resourcesHashBuilderV6(resData, addr, cidx, resData.UpdateRound, encodedResourceData)
hash, err := store.ResourcesHashBuilderV6(resData, addr, cidx, resData.UpdateRound, encodedResourceData)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/accountdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3244,7 +3244,7 @@ func TestRemoveOfflineStateProofID(t *testing.T) {
if expected && ba.Status != basics.Online {
require.Equal(t, merklesignature.Commitment{}, ba.StateProofID)
}
addHash := accountHashBuilderV6(addr, &ba, encodedAcctData)
addHash := store.AccountHashBuilderV6(addr, &ba, encodedAcctData)
added, err := trie.Add(addHash)
require.NoError(t, err)
require.True(t, added)
Expand Down
17 changes: 9 additions & 8 deletions ledger/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger/internal"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
Expand Down Expand Up @@ -219,10 +220,10 @@ func TestArchivalRestart(t *testing.T) {

var latest, earliest basics.Round
err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand All @@ -236,10 +237,10 @@ func TestArchivalRestart(t *testing.T) {
defer l.Close()

err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand Down Expand Up @@ -754,10 +755,10 @@ func TestArchivalFromNonArchival(t *testing.T) {

var latest, earliest basics.Round
err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand All @@ -774,10 +775,10 @@ func TestArchivalFromNonArchival(t *testing.T) {
defer l.Close()

err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
latest, err = blockLatest(tx)
latest, err = blockdb.BlockLatest(tx)
require.NoError(t, err)

earliest, err = blockEarliest(tx)
earliest, err = blockdb.BlockEarliest(tx)
require.NoError(t, err)
return err
})
Expand Down
23 changes: 12 additions & 11 deletions ledger/blockdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/store/blockdb"
storetesting "github.com/algorand/go-algorand/ledger/store/testing"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
Expand Down Expand Up @@ -78,19 +79,19 @@ func blockChainBlocks(be []blockEntry) []bookkeeping.Block {
}

func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) {
next, err := blockNext(tx)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file should go into blockdb package along with the code it tests

next, err := blockdb.BlockNext(tx)
require.NoError(t, err)
require.Equal(t, next, basics.Round(len(blocks)))

latest, err := blockLatest(tx)
latest, err := blockdb.BlockLatest(tx)
if len(blocks) == 0 {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, latest, basics.Round(len(blocks))-1)
}

earliest, err := blockEarliest(tx)
earliest, err := blockdb.BlockEarliest(tx)
if len(blocks) == 0 {
require.Error(t, err)
} else {
Expand All @@ -99,17 +100,17 @@ func checkBlockDB(t *testing.T, tx *sql.Tx, blocks []blockEntry) {
}

for rnd := basics.Round(0); rnd < basics.Round(len(blocks)); rnd++ {
blk, err := blockGet(tx, rnd)
blk, err := blockdb.BlockGet(tx, rnd)
require.NoError(t, err)
require.Equal(t, blk, blocks[rnd].block)

blk, cert, err := blockGetCert(tx, rnd)
blk, cert, err := blockdb.BlockGetCert(tx, rnd)
require.NoError(t, err)
require.Equal(t, blk, blocks[rnd].block)
require.Equal(t, cert, blocks[rnd].cert)
}

_, err = blockGet(tx, basics.Round(len(blocks)))
_, err = blockdb.BlockGet(tx, basics.Round(len(blocks)))
require.Error(t, err)
}

Expand All @@ -124,7 +125,7 @@ func TestBlockDBEmpty(t *testing.T) {
require.NoError(t, err)
defer tx.Rollback()

err = blockInit(tx, nil)
err = blockdb.BlockInit(tx, nil)
require.NoError(t, err)
checkBlockDB(t, tx, nil)
}
Expand All @@ -142,11 +143,11 @@ func TestBlockDBInit(t *testing.T) {

blocks := randomInitChain(protocol.ConsensusCurrentVersion, 10)

err = blockInit(tx, blockChainBlocks(blocks))
err = blockdb.BlockInit(tx, blockChainBlocks(blocks))
require.NoError(t, err)
checkBlockDB(t, tx, blocks)

err = blockInit(tx, blockChainBlocks(blocks))
err = blockdb.BlockInit(tx, blockChainBlocks(blocks))
require.NoError(t, err)
checkBlockDB(t, tx, blocks)
}
Expand All @@ -164,13 +165,13 @@ func TestBlockDBAppend(t *testing.T) {

blocks := randomInitChain(protocol.ConsensusCurrentVersion, 10)

err = blockInit(tx, blockChainBlocks(blocks))
err = blockdb.BlockInit(tx, blockChainBlocks(blocks))
require.NoError(t, err)
checkBlockDB(t, tx, blocks)

for i := 0; i < 10; i++ {
blkent := randomBlock(basics.Round(len(blocks)))
err = blockPut(tx, blkent.block, blkent.cert)
err = blockdb.BlockPut(tx, blkent.block, blkent.cert)
require.NoError(t, err)

blocks = append(blocks, blkent)
Expand Down
15 changes: 8 additions & 7 deletions ledger/blockqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/metrics"
Expand Down Expand Up @@ -61,7 +62,7 @@ func bqInit(l *Ledger) (*blockQueue, error) {
start := time.Now()
err := bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
bq.lastCommitted, err0 = blockLatest(tx)
bq.lastCommitted, err0 = blockdb.BlockLatest(tx)
return err0
})
ledgerBlockqInitMicros.AddMicrosecondsSince(start, nil)
Expand Down Expand Up @@ -111,7 +112,7 @@ func (bq *blockQueue) syncer() {
ledgerSyncBlockputCount.Inc(nil)
err := bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
for _, e := range workQ {
err0 := blockPut(tx, e.block, e.cert)
err0 := blockdb.BlockPut(tx, e.block, e.cert)
if err0 != nil {
return err0
}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (bq *blockQueue) syncer() {
bfstart := time.Now()
ledgerSyncBlockforgetCount.Inc(nil)
err = bq.l.blockDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
return blockForgetBefore(tx, minToSave)
return blockdb.BlockForgetBefore(tx, minToSave)
})
ledgerSyncBlockforgetMicros.AddMicrosecondsSince(bfstart, nil)
if err != nil {
Expand Down Expand Up @@ -261,7 +262,7 @@ func (bq *blockQueue) getBlock(r basics.Round) (blk bookkeeping.Block, err error
ledgerGetblockCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
blk, err0 = blockGet(tx, r)
blk, err0 = blockdb.BlockGet(tx, r)
return err0
})
ledgerGetblockMicros.AddMicrosecondsSince(start, nil)
Expand All @@ -283,7 +284,7 @@ func (bq *blockQueue) getBlockHdr(r basics.Round) (hdr bookkeeping.BlockHeader,
ledgerGetblockhdrCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
hdr, err0 = blockGetHdr(tx, r)
hdr, err0 = blockdb.BlockGetHdr(tx, r)
return err0
})
ledgerGetblockhdrMicros.AddMicrosecondsSince(start, nil)
Expand All @@ -309,7 +310,7 @@ func (bq *blockQueue) getEncodedBlockCert(r basics.Round) (blk []byte, cert []by
ledgerGeteblockcertCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
blk, cert, err0 = blockGetEncodedCert(tx, r)
blk, cert, err0 = blockdb.BlockGetEncodedCert(tx, r)
return err0
})
ledgerGeteblockcertMicros.AddMicrosecondsSince(start, nil)
Expand All @@ -331,7 +332,7 @@ func (bq *blockQueue) getBlockCert(r basics.Round) (blk bookkeeping.Block, cert
ledgerGetblockcertCount.Inc(nil)
err = bq.l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
blk, cert, err0 = blockGetCert(tx, r)
blk, cert, err0 = blockdb.BlockGetCert(tx, r)
return err0
})
ledgerGetblockcertMicros.AddMicrosecondsSince(start, nil)
Expand Down
Loading