diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index 77889289e2..3787636fde 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -1169,9 +1169,9 @@ func (au *accountUpdates) lookupResource(rnd basics.Round, addr basics.Address, return macct.AccountResource(), rnd, nil } - // check baseAccoiunts again to see if it does not exist + // check baseResources again to see if it does not exist if au.baseResources.readNotFound(addr, aidx) { - // it seems the account doesnt exist + // it seems the resource doesn't exist return ledgercore.AccountResource{}, rnd, nil } @@ -1388,9 +1388,9 @@ func (au *accountUpdates) lookupWithoutRewards(rnd basics.Round, addr basics.Add return macct.AccountData.GetLedgerCoreAccountData(), rnd, rewardsVersion, rewardsLevel, nil } - // check baseAccoiunts again to see if it does not exist + // check baseAccounts again to see if it does not exist if au.baseAccounts.readNotFound(addr) { - // it seems the account doesnt exist + // it seems the account doesn't exist return ledgercore.AccountData{}, rnd, rewardsVersion, rewardsLevel, nil } diff --git a/ledger/eval/eval.go b/ledger/eval/eval.go index 5c44073e0a..4b74f78518 100644 --- a/ledger/eval/eval.go +++ b/ledger/eval/eval.go @@ -129,7 +129,7 @@ type roundCowBase struct { // execution. The AccountData is always an historical one, then therefore won't be changing. // The underlying (accountupdates) infrastructure may provide additional cross-round caching which // are beyond the scope of this cache. - // The account data store here is always the account data without the rewards. + // The account data stored here is always the account data without the rewards. accounts map[basics.Address]ledgercore.AccountData // The online accounts that we've already accessed during this round evaluation. This is a @@ -2018,8 +2018,7 @@ func (validator *evalTxValidator) run() { RewardsPool: validator.block.BlockHeader.RewardsPool, } - var unverifiedTxnGroups [][]transactions.SignedTxn - unverifiedTxnGroups = make([][]transactions.SignedTxn, 0, len(validator.txgroups)) + unverifiedTxnGroups := make([][]transactions.SignedTxn, 0, len(validator.txgroups)) for _, group := range validator.txgroups { signedTxnGroup := make([]transactions.SignedTxn, len(group)) for j, txn := range group { @@ -2076,7 +2075,7 @@ func Eval(ctx context.Context, l LedgerForEvaluator, blk bookkeeping.Block, vali } accountLoadingCtx, accountLoadingCancel := context.WithCancel(ctx) - preloadedTxnsData := prefetcher.PrefetchAccounts(accountLoadingCtx, l, blk.Round()-1, paysetgroups, blk.BlockHeader.FeeSink, blk.ConsensusProtocol()) + preloadedTxnsData := prefetcher.BlockReferences(accountLoadingCtx, l, blk.Round()-1, paysetgroups, blk.BlockHeader.FeeSink, blk.ConsensusProtocol()) // ensure that before we exit from this method, the account loading is no longer active. defer func() { accountLoadingCancel() @@ -2163,6 +2162,11 @@ transactionGroupLoop: } } } + for _, lkv := range txgroup.KVs { + if _, have := base.kvStore[lkv.Key]; !have { + base.kvStore[lkv.Key] = lkv.Value + } + } } err = eval.TransactionGroup(txgroup.TxnGroup...) if err != nil { diff --git a/ledger/eval/prefetcher/error.go b/ledger/eval/prefetcher/error.go deleted file mode 100644 index 9525525e2a..0000000000 --- a/ledger/eval/prefetcher/error.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (C) 2019-2026 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . - -package prefetcher - -import ( - "fmt" - - "github.com/algorand/go-algorand/data/basics" -) - -// GroupTaskError indicates the group index of the unfulfilled resource -type GroupTaskError struct { - err error - GroupIdx int64 - Address *basics.Address - CreatableIndex basics.CreatableIndex - CreatableType basics.CreatableType -} - -// Error satisfies builtin interface `error` -func (err *GroupTaskError) Error() string { - return fmt.Sprintf("prefetch failed for groupIdx %d, address: %s, creatableIndex %d, creatableType %d, cause: %v", - err.GroupIdx, err.Address, err.CreatableIndex, err.CreatableType, err.err) -} - -// Unwrap provides access to the underlying error -func (err *GroupTaskError) Unwrap() error { - return err.err -} diff --git a/ledger/eval/prefetcher/prefetcher.go b/ledger/eval/prefetcher/prefetcher.go index 891611ffc6..5696464988 100644 --- a/ledger/eval/prefetcher/prefetcher.go +++ b/ledger/eval/prefetcher/prefetcher.go @@ -18,19 +18,24 @@ package prefetcher import ( "context" + "fmt" + "runtime" "sync/atomic" + "github.com/algorand/avm-abi/apps" + "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/util" ) // asyncAccountLoadingThreadCount controls how many go routines would be used // to load the account data before the Eval() start processing individual // transaction group. -const asyncAccountLoadingThreadCount = 4 +var asyncAccountLoadingThreadCount = min(8, (runtime.NumCPU()+1)/2) // Ledger is a ledger interfaces for prefetcher. type Ledger interface { @@ -38,6 +43,7 @@ type Ledger interface { LookupAsset(basics.Round, basics.Address, basics.AssetIndex) (ledgercore.AssetResource, error) LookupApplication(basics.Round, basics.Address, basics.AppIndex) (ledgercore.AppResource, error) GetCreatorForRound(basics.Round, basics.CreatableIndex, basics.CreatableType) (basics.Address, bool, error) + LookupKv(basics.Round, string) ([]byte, error) } // LoadedAccountDataEntry describes a loaded account. @@ -46,8 +52,8 @@ type LoadedAccountDataEntry struct { Data *ledgercore.AccountData } -// LoadedResourcesEntry describes a loaded resource. -type LoadedResourcesEntry struct { +// LoadedResourceEntry describes a loaded resource. +type LoadedResourceEntry struct { // Resource is the loaded Resource entry. unless address is nil, Resource would always contain a valid ledgercore.AccountResource pointer. Resource *ledgercore.AccountResource // Address might be empty if the resource does not exist. In that case creatableIndex and creatableType would still be valid while resource would be nil. @@ -56,24 +62,33 @@ type LoadedResourcesEntry struct { CreatableType basics.CreatableType } +// LoadedKVEntry describes a loaded kv. +type LoadedKVEntry struct { + Key string + Value []byte +} + // LoadedTransactionGroup is a helper struct to allow asynchronous loading of the account data needed by the transaction groups type LoadedTransactionGroup struct { // the transaction group TxnGroup []transactions.SignedTxnWithAD - // Accounts is a list of all the Accounts balance records that the transaction group refer to and are needed. + // Accounts is a list of all the Account balance records for the transaction group. Accounts []LoadedAccountDataEntry - // the following four are the Resources used by the account - Resources []LoadedResourcesEntry + // Resources is the list of all Resources (apps/assets/hodling/locals) for the transaction group. + Resources []LoadedResourceEntry + + // KVs is the list of all kvs for the transaction group + KVs []LoadedKVEntry // Err indicates whether any of the balances in this structure have failed to load. In case of an error, at least // one of the entries in the balances would be uninitialized. - Err *GroupTaskError + Err error } -// accountPrefetcher used to prefetch accounts balances and resources before the evaluator is being called. -type accountPrefetcher struct { +// paysetPrefetcher used to prefetch accounts balances and resources before the evaluator is called. +type paysetPrefetcher struct { ledger Ledger rnd basics.Round txnGroups [][]transactions.SignedTxnWithAD @@ -82,10 +97,12 @@ type accountPrefetcher struct { outChan chan LoadedTransactionGroup } -// PrefetchAccounts loads the account data for the provided transaction group list. It also loads the feeSink account and add it to the first returned transaction group. -// The order of the transaction groups returned by the channel is identical to the one in the input array. -func PrefetchAccounts(ctx context.Context, l Ledger, rnd basics.Round, txnGroups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) <-chan LoadedTransactionGroup { - prefetcher := &accountPrefetcher{ +// BlockReferences loads the resources for the provided transaction group list. It also +// loads the feeSink account and adds it to the first returned transaction +// group. The order of the transaction groups returned by the channel is +// identical to the one in the input array. +func BlockReferences(ctx context.Context, l Ledger, rnd basics.Round, txnGroups [][]transactions.SignedTxnWithAD, feeSinkAddr basics.Address, consensusParams config.ConsensusParams) <-chan LoadedTransactionGroup { + prefetcher := &paysetPrefetcher{ ledger: l, rnd: rnd, txnGroups: txnGroups, @@ -101,25 +118,32 @@ func PrefetchAccounts(ctx context.Context, l Ledger, rnd basics.Round, txnGroups // groupTask helps to organize the account loading for each transaction group. type groupTask struct { // incompleteCount is the number of resources+balances still pending and need to be loaded - // this variable is used by as atomic variable to synchronize the readiness of the group taks. + // it is used to synchronize the readiness of the group task. incompleteCount atomic.Int64 // the group task index - aligns with the index of the transaction group in the // provided groups slice. groupTaskIndex atomic.Int64 + // balances contains the loaded balances each transaction group have balances []LoadedAccountDataEntry - // balancesCount is the number of balances that nees to be loaded per transaction group + // balancesCount is the number of balances that need to be loaded for this transaction group balancesCount int + // resources contains the loaded resources each of the transaction groups have - resources []LoadedResourcesEntry - // resourcesCount is the number of resources that nees to be loaded per transaction group + resources []LoadedResourceEntry + // resourcesCount is the number of resources that need to be loaded for this transaction group resourcesCount int + // kvs contains the loaded resources each of the transaction groups have + kvs []LoadedKVEntry + // kvCount is the number of kvs that need to be loaded for this transaction group + kvCount int + // error while processing this group task - err *GroupTaskError + err error } -// preloaderTask manage the loading of a single element, whether it's a resource or an account address. +// preloaderTask manage the loading of a single element, whether account, creatable, or kv type preloaderTask struct { // account address to fetch address *basics.Address @@ -127,64 +151,39 @@ type preloaderTask struct { creatableIndex basics.CreatableIndex // resource type creatableType basics.CreatableType - // a list of transaction group tasks that depends on this address or resource - groupTasks []*groupTask - // a list of indices into the groupTask.balances or groupTask.resources where the address would be stored - groupTasksIndices []int -} -// preloaderTaskQueue is a dynamic linked list of enqueued entries, optimized for non-syncronized insertion and -// syncronized extraction -type preloaderTaskQueue struct { - next *preloaderTaskQueue - used int - entries []*preloaderTask - baseIdx int - maxTxnGroupEntries int -} + // key is the kv to fetch, if this is a kv task + key string -type groupTaskDone struct { - groupIdx int64 - err error - task *preloaderTask + // the transaction group task to put the loaded data into + groupTask *groupTask + // the index at which to place the resource (in groupTask) + groupTaskIndex int } -func allocPreloaderQueue(count int, maxTxnGroupEntries int) preloaderTaskQueue { - return preloaderTaskQueue{ - entries: make([]*preloaderTask, count*2+maxTxnGroupEntries*2), - maxTxnGroupEntries: maxTxnGroupEntries, - } +type preloaderTaskQueue util.PagedQueue[*preloaderTask] + +func allocPreloaderQueue(count int) *preloaderTaskQueue { + return (*preloaderTaskQueue)(util.NewPagedQueue[*preloaderTask](count)) } -// enqueue places the queued entry on the queue, returning the latest queue -// ( in case the current "page" ran out of space ) -func (pq *preloaderTaskQueue) enqueue(t *preloaderTask) { - pq.entries[pq.used] = t - pq.used++ +func (pq *preloaderTaskQueue) append(t *preloaderTask) *preloaderTaskQueue { + return (*preloaderTaskQueue)((*util.PagedQueue[*preloaderTask])(pq).Append(t)) } -func (pq *preloaderTaskQueue) expand() *preloaderTaskQueue { - if cap(pq.entries)-pq.used < pq.maxTxnGroupEntries { - pq.next = &preloaderTaskQueue{ - entries: make([]*preloaderTask, cap(pq.entries)*2), - used: 0, - baseIdx: pq.baseIdx + pq.used, - maxTxnGroupEntries: pq.maxTxnGroupEntries, - } - return pq.next - } - return pq +func (pq *preloaderTaskQueue) length() int64 { + return int64((*util.PagedQueue[*preloaderTask])(pq).Len()) } func (pq *preloaderTaskQueue) getTaskAtIndex(idx int) (*preloaderTaskQueue, *preloaderTask) { - localIdx := idx - pq.baseIdx - if pq.used > localIdx { - return pq, pq.entries[localIdx] - } - if pq.next != nil { - return pq.next.getTaskAtIndex(idx) - } - return pq, nil + page, task := (*util.PagedQueue[*preloaderTask])(pq).Get(idx) + return (*preloaderTaskQueue)(page), task +} + +type groupTaskDone struct { + groupIdx int64 + err error + task *preloaderTask } type accountCreatableKey struct { @@ -192,31 +191,42 @@ type accountCreatableKey struct { cidx basics.CreatableIndex } -func loadAccountsAddAccountTask(addr *basics.Address, wt *groupTask, accountTasks map[basics.Address]*preloaderTask, queue *preloaderTaskQueue) { +func (pq *preloaderTaskQueue) addAccountTask(addr *basics.Address, wt *groupTask, accountTasks map[basics.Address]*preloaderTask) *preloaderTaskQueue { if addr.IsZero() { - return + return pq } - if task, have := accountTasks[*addr]; !have { + if _, have := accountTasks[*addr]; !have { newTask := &preloaderTask{ - address: addr, - groupTasks: make([]*groupTask, 1, 4), - groupTasksIndices: make([]int, 1, 4), + address: addr, + groupTask: wt, + groupTaskIndex: wt.balancesCount, } - newTask.groupTasks[0] = wt - newTask.groupTasksIndices[0] = wt.balancesCount - + wt.balancesCount++ accountTasks[*addr] = newTask - queue.enqueue(newTask) - } else { - task.groupTasks = append(task.groupTasks, wt) - task.groupTasksIndices = append(task.groupTasksIndices, wt.balancesCount) + pq = pq.append(newTask) } - wt.balancesCount++ + return pq +} + +func (pq *preloaderTaskQueue) addAssetTask(aid basics.AssetIndex, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask) *preloaderTaskQueue { + return pq.addResourceTask(nil, basics.CreatableIndex(aid), basics.AssetCreatable, wt, resourceTasks) +} + +func (pq *preloaderTaskQueue) addHoldingTask(addr basics.Address, aid basics.AssetIndex, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask) *preloaderTaskQueue { + return pq.addResourceTask(&addr, basics.CreatableIndex(aid), basics.AssetCreatable, wt, resourceTasks) +} + +func (pq *preloaderTaskQueue) addAppTask(aid basics.AppIndex, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask) *preloaderTaskQueue { + return pq.addResourceTask(nil, basics.CreatableIndex(aid), basics.AppCreatable, wt, resourceTasks) +} + +func (pq *preloaderTaskQueue) addLocalsTask(addr basics.Address, aid basics.AppIndex, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask) *preloaderTaskQueue { + return pq.addResourceTask(&addr, basics.CreatableIndex(aid), basics.AppCreatable, wt, resourceTasks) } -func loadAccountsAddResourceTask(addr *basics.Address, cidx basics.CreatableIndex, ctype basics.CreatableType, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask, queue *preloaderTaskQueue) { +func (pq *preloaderTaskQueue) addResourceTask(addr *basics.Address, cidx basics.CreatableIndex, ctype basics.CreatableType, wt *groupTask, resourceTasks map[accountCreatableKey]*preloaderTask) *preloaderTaskQueue { if cidx == 0 { - return + return pq } key := accountCreatableKey{ cidx: cidx, @@ -224,48 +234,58 @@ func loadAccountsAddResourceTask(addr *basics.Address, cidx basics.CreatableInde if addr != nil { key.address = *addr } - if task, have := resourceTasks[key]; !have { + if _, have := resourceTasks[key]; !have { newTask := &preloaderTask{ - address: addr, - groupTasks: make([]*groupTask, 1, 4), - groupTasksIndices: make([]int, 1, 4), - creatableIndex: cidx, - creatableType: ctype, + address: addr, + groupTask: wt, + groupTaskIndex: wt.resourcesCount, + creatableIndex: cidx, + creatableType: ctype, } - newTask.groupTasks[0] = wt - newTask.groupTasksIndices[0] = wt.resourcesCount - + wt.resourcesCount++ resourceTasks[key] = newTask - queue.enqueue(newTask) - } else { - task.groupTasks = append(task.groupTasks, wt) - task.groupTasksIndices = append(task.groupTasksIndices, wt.resourcesCount) + pq = pq.append(newTask) + } + return pq +} + +func (pq *preloaderTaskQueue) addKvTask(app basics.AppIndex, name []byte, wt *groupTask, kvTasks map[string]*preloaderTask) *preloaderTaskQueue { + if app == 0 || len(name) == 0 { + return pq + } + key := apps.MakeBoxKey(uint64(app), string(name)) + if _, have := kvTasks[key]; !have { + newTask := &preloaderTask{ + key: key, + groupTask: wt, + groupTaskIndex: wt.kvCount, + } + wt.kvCount++ + kvTasks[key] = newTask + pq = pq.append(newTask) } - wt.resourcesCount++ + return pq } // prefetch would process the input transaction groups by analyzing each of the transaction groups and building // an execution queue that would allow us to fetch all the dependencies for the input transaction groups in order // and output these onto a channel. -func (p *accountPrefetcher) prefetch(ctx context.Context) { +func (p *paysetPrefetcher) prefetch(ctx context.Context) { defer close(p.outChan) accountTasks := make(map[basics.Address]*preloaderTask) resourceTasks := make(map[accountCreatableKey]*preloaderTask) + kvTasks := make(map[string]*preloaderTask) - var maxTxnGroupEntries int - if p.consensusParams.Application { - // the extra two are for the sender account data, plus the application global state - maxTxnGroupEntries = p.consensusParams.MaxTxGroupSize * (2 + p.consensusParams.MaxAppTxnAccounts + p.consensusParams.MaxAppTxnForeignApps + p.consensusParams.MaxAppTxnForeignAssets) - } else { - // 8 is the number of resources+account used in the AssetTransferTx, which is the largest one. - maxTxnGroupEntries = p.consensusParams.MaxTxGroupSize * 8 + txnCount := 0 + for _, group := range p.txnGroups { + txnCount += len(group) } - - tasksQueue := allocPreloaderQueue(len(p.txnGroups), maxTxnGroupEntries) + tasksQueue := allocPreloaderQueue(4 * txnCount) // 4 is just an approximation // totalBalances counts the total number of balances over all the transaction groups totalBalances := 0 totalResources := 0 + totalKVs := 0 // initialize empty groupTasks for groupsReady groupsReady := make([]*groupTask, len(p.txnGroups)) @@ -273,108 +293,172 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { groupsReady[i] = new(groupTask) // this ensures each allocated groupTask is 64-bit aligned } + // iterate over the transaction groups and add resources that are very likely to be accessed + queue := tasksQueue + // Add fee sink to the first group if len(p.txnGroups) > 0 { // the feeSinkAddr is known to be non-empty feeSinkPreloader := &preloaderTask{ - address: &p.feeSinkAddr, - groupTasks: []*groupTask{groupsReady[0]}, - groupTasksIndices: []int{0}, + address: &p.feeSinkAddr, + groupTask: groupsReady[0], + groupTaskIndex: 0, } - groupsReady[0].balancesCount = 1 + groupsReady[0].balancesCount++ accountTasks[p.feeSinkAddr] = feeSinkPreloader - tasksQueue.enqueue(feeSinkPreloader) + queue = queue.append(feeSinkPreloader) } - - // iterate over the transaction groups and add all their account addresses to the list - queue := &tasksQueue for i := range p.txnGroups { task := groupsReady[i] for j := range p.txnGroups[i] { stxn := &p.txnGroups[i][j] switch stxn.Txn.Type { case protocol.PaymentTx: - loadAccountsAddAccountTask(&stxn.Txn.Receiver, task, accountTasks, queue) - loadAccountsAddAccountTask(&stxn.Txn.CloseRemainderTo, task, accountTasks, queue) + queue = queue.addAccountTask(&stxn.Txn.Receiver, task, accountTasks) + queue = queue.addAccountTask(&stxn.Txn.CloseRemainderTo, task, accountTasks) case protocol.AssetConfigTx: - loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.ConfigAsset), basics.AssetCreatable, task, resourceTasks, queue) + queue = queue.addAssetTask(stxn.Txn.ConfigAsset, task, resourceTasks) case protocol.AssetTransferTx: if !stxn.Txn.AssetSender.IsZero() { - loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) - loadAccountsAddResourceTask(&stxn.Txn.AssetSender, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + queue = queue.addAssetTask(stxn.Txn.XferAsset, task, resourceTasks) + queue = queue.addHoldingTask(stxn.Txn.AssetSender, stxn.Txn.XferAsset, task, resourceTasks) } else { if stxn.Txn.AssetAmount == 0 && (stxn.Txn.AssetReceiver == stxn.Txn.Sender) { // opt in - loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + queue = queue.addAssetTask(stxn.Txn.XferAsset, task, resourceTasks) } if stxn.Txn.AssetAmount != 0 { // zero transfer is noop - loadAccountsAddResourceTask(&stxn.Txn.Sender, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + queue = queue.addHoldingTask(stxn.Txn.Sender, stxn.Txn.XferAsset, task, resourceTasks) } } if !stxn.Txn.AssetReceiver.IsZero() { if stxn.Txn.AssetAmount != 0 || (stxn.Txn.AssetReceiver == stxn.Txn.Sender) { // if not zero transfer or opt in then prefetch - loadAccountsAddResourceTask(&stxn.Txn.AssetReceiver, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + queue = queue.addHoldingTask(stxn.Txn.AssetReceiver, stxn.Txn.XferAsset, task, resourceTasks) } } if !stxn.Txn.AssetCloseTo.IsZero() { - loadAccountsAddResourceTask(&stxn.Txn.AssetCloseTo, basics.CreatableIndex(stxn.Txn.XferAsset), basics.AssetCreatable, task, resourceTasks, queue) + queue = queue.addHoldingTask(stxn.Txn.AssetCloseTo, stxn.Txn.XferAsset, task, resourceTasks) } case protocol.AssetFreezeTx: if !stxn.Txn.FreezeAccount.IsZero() { - loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.FreezeAsset), basics.AssetCreatable, task, resourceTasks, queue) - loadAccountsAddResourceTask(&stxn.Txn.FreezeAccount, basics.CreatableIndex(stxn.Txn.FreezeAsset), basics.AssetCreatable, task, resourceTasks, queue) - loadAccountsAddAccountTask(&stxn.Txn.FreezeAccount, task, accountTasks, queue) + queue = queue.addAssetTask(stxn.Txn.FreezeAsset, task, resourceTasks) + queue = queue.addHoldingTask(stxn.Txn.FreezeAccount, stxn.Txn.FreezeAsset, task, resourceTasks) } case protocol.ApplicationCallTx: if stxn.Txn.ApplicationID != 0 { // load the global - so that we'll have the program - loadAccountsAddResourceTask(nil, basics.CreatableIndex(stxn.Txn.ApplicationID), basics.AppCreatable, task, resourceTasks, queue) + queue = queue.addAppTask(stxn.Txn.ApplicationID, task, resourceTasks) // load the local - so that we'll have the local state // TODO: this is something we need to decide if we want to enable, since not // every application call would use local storage. if (stxn.Txn.ApplicationCallTxnFields.OnCompletion == transactions.OptInOC) || (stxn.Txn.ApplicationCallTxnFields.OnCompletion == transactions.CloseOutOC) || (stxn.Txn.ApplicationCallTxnFields.OnCompletion == transactions.ClearStateOC) { - loadAccountsAddResourceTask(&stxn.Txn.Sender, basics.CreatableIndex(stxn.Txn.ApplicationID), basics.AppCreatable, task, resourceTasks, queue) + queue = queue.addLocalsTask(stxn.Txn.Sender, stxn.Txn.ApplicationID, task, resourceTasks) } } - // do not preload Txn.ForeignApps, Txn.ForeignAssets, Txn.Accounts - // since they might be non-used arbitrary values + // Prefetch ForeignApps since they're likely to be accessed + for _, appID := range stxn.Txn.ForeignApps { + queue = queue.addAppTask(appID, task, resourceTasks) + } + + // Prefetch boxes, they ought to be precise + for _, br := range stxn.Txn.Boxes { + if len(br.Name) == 0 { + continue + } + // defense: we don't even know if WellFormed yet. + if br.Index > uint64(len(stxn.Txn.ForeignApps)) { + continue + } + app := stxn.Txn.ApplicationID + if br.Index != 0 { + app = stxn.Txn.ForeignApps[br.Index-1] + } + if app != 0 { + queue = queue.addKvTask(app, br.Name, task, kvTasks) + } + } + + // With tx.Access, cross-products are explicit, so we fetch them + // (and the apps and boxes, as with foreign arrays). We also + // fetch the accounts and assets if they are NOT used in + // cross-products. That implies they are directly needed. + if len(stxn.Txn.Access) > 0 { + // Track which accounts and assets appear in cross-products + accountInCrossProduct := util.MakeSet[basics.Address]() + assetInCrossProduct := util.MakeSet[basics.AssetIndex]() + + for _, rr := range stxn.Txn.Access { + if rr.App != 0 { + queue = queue.addResourceTask(nil, basics.CreatableIndex(rr.App), basics.AppCreatable, task, resourceTasks) + } + if !rr.Holding.Empty() { + addr, asset, err := rr.Holding.Resolve(stxn.Txn.Access, stxn.Txn.Sender) + if err == nil { + queue = queue.addHoldingTask(addr, asset, task, resourceTasks) + accountInCrossProduct.Add(addr) + assetInCrossProduct.Add(asset) + } + } + if !rr.Locals.Empty() { + addr, app, err := rr.Locals.Resolve(stxn.Txn.Access, stxn.Txn.Sender, stxn.Txn.ApplicationID) + if err == nil { + queue = queue.addLocalsTask(addr, app, task, resourceTasks) + accountInCrossProduct.Add(addr) + } + } + if !rr.Box.Empty() { + app, name, err := rr.Box.Resolve(stxn.Txn.Access) + if err == nil { + if app == 0 { + app = stxn.Txn.ApplicationID + } + queue = queue.addKvTask(app, []byte(name), task, kvTasks) + } + } + } + + // Presumably, accounts and assets that don't appear in + // cross-products are present to be directly accessed. + for _, rr := range stxn.Txn.Access { + if !rr.Address.IsZero() && !accountInCrossProduct.Contains(rr.Address) { + queue = queue.addAccountTask(&rr.Address, task, accountTasks) + } + if rr.Asset != 0 && !assetInCrossProduct.Contains(rr.Asset) { + queue = queue.addAssetTask(rr.Asset, task, resourceTasks) + } + } + } case protocol.StateProofTx: case protocol.KeyRegistrationTx: // No extra accounts besides the sender case protocol.HeartbeatTx: - loadAccountsAddAccountTask(&stxn.Txn.HbAddress, task, accountTasks, queue) + queue = queue.addAccountTask(&stxn.Txn.HbAddress, task, accountTasks) } // If you add new addresses here, also add them in getTxnAddresses(). if !stxn.Txn.Sender.IsZero() { - loadAccountsAddAccountTask(&stxn.Txn.Sender, task, accountTasks, queue) + queue = queue.addAccountTask(&stxn.Txn.Sender, task, accountTasks) } } totalBalances += task.balancesCount totalResources += task.resourcesCount - // expand the queue if needed. - queue = queue.expand() + totalKVs += task.kvCount } - // find the number of tasks - tasksCount := int64(0) - for lastQueueEntry := &tasksQueue; ; lastQueueEntry = lastQueueEntry.next { - if lastQueueEntry.next == nil { - tasksCount = int64(lastQueueEntry.baseIdx + lastQueueEntry.used) - break - } - } + tasksCount := queue.length() // update all the groups task : // allocate the correct number of balances, as well as // enough space on the "done" channel. allBalances := make([]LoadedAccountDataEntry, totalBalances) - allResources := make([]LoadedResourcesEntry, totalResources) + allResources := make([]LoadedResourceEntry, totalResources) + allKVs := make([]LoadedKVEntry, totalKVs) usedBalances := 0 usedResources := 0 + usedKVs := 0 // groupDoneCh is used to communicate the completion signal for a single // resource/address load between the go-routines and the main output channel @@ -385,13 +469,13 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { for grpIdx := range groupsReady { gr := groupsReady[grpIdx] gr.groupTaskIndex.Store(int64(grpIdx)) - gr.incompleteCount.Store(int64(gr.balancesCount + gr.resourcesCount)) + gr.incompleteCount.Store(int64(gr.balancesCount + gr.resourcesCount + gr.kvCount)) gr.balances = allBalances[usedBalances : usedBalances+gr.balancesCount] - if gr.resourcesCount > 0 { - gr.resources = allResources[usedResources : usedResources+gr.resourcesCount] - usedResources += gr.resourcesCount - } usedBalances += gr.balancesCount + gr.resources = allResources[usedResources : usedResources+gr.resourcesCount] + usedResources += gr.resourcesCount + gr.kvs = allKVs[usedKVs : usedKVs+gr.kvCount] + usedKVs += gr.kvCount if gr.incompleteCount.Load() == 0 { gr.incompleteCount.Store(dependencyFreeGroup) } @@ -400,9 +484,9 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { var taskIdx atomic.Int64 taskIdx.Store(-1) defer taskIdx.Store(tasksCount) - // create few go-routines to load asyncroniously the account data. - for i := 0; i < asyncAccountLoadingThreadCount; i++ { - go p.asyncPrefetchRoutine(&tasksQueue, &taskIdx, groupDoneCh) + // create a few go-routines to asynchronously perform prefetches + for range asyncAccountLoadingThreadCount { + go p.asyncPrefetchRoutine(tasksQueue, &taskIdx, groupDoneCh) } // iterate on the transaction groups tasks. This array retains the original order. @@ -414,16 +498,18 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { select { case done := <-groupDoneCh: if done.err != nil { - groupsReady[done.groupIdx].err = &GroupTaskError{ - err: done.err, - GroupIdx: done.groupIdx, - Address: done.task.address, - CreatableIndex: done.task.creatableIndex, - CreatableType: done.task.creatableType, + var e error + if done.task.key != "" { + e = fmt.Errorf("prefetch failed for groupIdx %d, kv: %x, cause: %w", + done.groupIdx, done.task.key, done.err) + } else { + e = fmt.Errorf("prefetch failed for groupIdx %d, address: %s, creatableIndex %d, creatableType %d, cause: %w", + done.groupIdx, done.task.address, done.task.creatableIndex, done.task.creatableType, done.err) } + groupsReady[done.groupIdx].err = e } if done.groupIdx > i { - // mark future txn as ready. + // mark future txngroup as ready. completed[done.groupIdx] = true goto wait } else if done.groupIdx < i { @@ -434,8 +520,8 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { return } } - next := i - for ; next < int64(len(p.txnGroups)); next++ { + + for next := i; next < int64(len(p.txnGroups)); next++ { if !completed[next] { if next > i { i = next @@ -453,9 +539,10 @@ func (p *accountPrefetcher) prefetch(ctx context.Context) { TxnGroup: p.txnGroups[next], Accounts: groupsReady[next].balances, Resources: groupsReady[next].resources, + KVs: groupsReady[next].kvs, } } - // if we get to this point, it means that we have no more transaction to process. + // if we get to this point, it means that we have no more groups to process. break } } @@ -467,14 +554,21 @@ func (gt *groupTask) markCompletionAcct(idx int, br LoadedAccountDataEntry, grou } } -func (gt *groupTask) markCompletionResource(idx int, res LoadedResourcesEntry, groupDoneCh chan groupTaskDone) { +func (gt *groupTask) markCompletionResource(idx int, res LoadedResourceEntry, groupDoneCh chan groupTaskDone) { gt.resources[idx] = res if gt.incompleteCount.Add(-1) == 0 { groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex.Load()} } } -func (gt *groupTask) markCompletionAcctError(err error, task *preloaderTask, groupDoneCh chan groupTaskDone) { +func (gt *groupTask) markCompletionKv(idx int, kv LoadedKVEntry, groupDoneCh chan groupTaskDone) { + gt.kvs[idx] = kv + if gt.incompleteCount.Add(-1) == 0 { + groupDoneCh <- groupTaskDone{groupIdx: gt.groupTaskIndex.Load()} + } +} + +func (gt *groupTask) markCompletionError(err error, task *preloaderTask, groupDoneCh chan groupTaskDone) { for { curVal := gt.incompleteCount.Load() if curVal <= 0 { @@ -491,9 +585,8 @@ func (gt *groupTask) markCompletionAcctError(err error, task *preloaderTask, gro } } -func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, taskIdx *atomic.Int64, groupDoneCh chan groupTaskDone) { +func (p *paysetPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, taskIdx *atomic.Int64, groupDoneCh chan groupTaskDone) { var task *preloaderTask - var err error for { nextTaskIdx := taskIdx.Add(1) queue, task = queue.getTaskAtIndex(int(nextTaskIdx)) @@ -501,84 +594,81 @@ func (p *accountPrefetcher) asyncPrefetchRoutine(queue *preloaderTaskQueue, task // no more tasks. return } + if task.key != "" { + value, err := p.ledger.LookupKv(p.rnd, task.key) + if err != nil { + // notify the channel of the error. + task.groupTask.markCompletionError(err, task, groupDoneCh) + continue + } + br := LoadedKVEntry{ + Key: task.key, + Value: value, + } + task.groupTask.markCompletionKv(task.groupTaskIndex, br, groupDoneCh) + continue + } if task.creatableIndex == 0 { // lookup the account data directly from the ledger. - var acctData ledgercore.AccountData - acctData, _, err = p.ledger.LookupWithoutRewards(p.rnd, *task.address) - // if there was an error.. + acctData, _, err := p.ledger.LookupWithoutRewards(p.rnd, *task.address) if err != nil { - // there was an error loading that entry. - for _, wt := range task.groupTasks { - // notify the channel of the error. - wt.markCompletionAcctError(err, task, groupDoneCh) - } + // notify the channel of the error. + task.groupTask.markCompletionError(err, task, groupDoneCh) continue } br := LoadedAccountDataEntry{ Address: task.address, Data: &acctData, } - // update all the group tasks with the new acquired balance. - for i, wt := range task.groupTasks { - wt.markCompletionAcct(task.groupTasksIndices[i], br, groupDoneCh) - } + task.groupTask.markCompletionAcct(task.groupTaskIndex, br, groupDoneCh) continue } if task.address == nil { // start off by figuring out the creator in case it's a global resource. - var creator basics.Address - var ok bool - creator, ok, err = p.ledger.GetCreatorForRound(p.rnd, task.creatableIndex, task.creatableType) + creator, ok, err := p.ledger.GetCreatorForRound(p.rnd, task.creatableIndex, task.creatableType) if err != nil { // there was an error loading that entry. - for _, wt := range task.groupTasks { - // notify the channel of the error. - wt.markCompletionAcctError(err, task, groupDoneCh) - } + task.groupTask.markCompletionError(err, task, groupDoneCh) continue } if !ok { - re := LoadedResourcesEntry{ + re := LoadedResourceEntry{ CreatableIndex: task.creatableIndex, CreatableType: task.creatableType, } - // update all the group tasks with the new acquired balance. - for i, wt := range task.groupTasks { - wt.markCompletionResource(task.groupTasksIndices[i], re, groupDoneCh) - } + task.groupTask.markCompletionResource(task.groupTaskIndex, re, groupDoneCh) continue } task.address = &creator } var resource ledgercore.AccountResource if task.creatableType == basics.AppCreatable { - var appResource ledgercore.AppResource - appResource, err = p.ledger.LookupApplication(p.rnd, *task.address, basics.AppIndex(task.creatableIndex)) + appResource, err := p.ledger.LookupApplication(p.rnd, *task.address, basics.AppIndex(task.creatableIndex)) + if err != nil { + // notify the channel of the error. + task.groupTask.markCompletionError(err, task, groupDoneCh) + continue + } resource.AppParams = appResource.AppParams resource.AppLocalState = appResource.AppLocalState } else { var assetResource ledgercore.AssetResource - assetResource, err = p.ledger.LookupAsset(p.rnd, *task.address, basics.AssetIndex(task.creatableIndex)) - resource.AssetParams = assetResource.AssetParams - resource.AssetHolding = assetResource.AssetHolding - } - if err != nil { - // there was an error loading that entry. - for _, wt := range task.groupTasks { + assetResource, err := p.ledger.LookupAsset(p.rnd, *task.address, basics.AssetIndex(task.creatableIndex)) + if err != nil { // notify the channel of the error. - wt.markCompletionAcctError(err, task, groupDoneCh) + task.groupTask.markCompletionError(err, task, groupDoneCh) + continue } - continue + resource.AssetParams = assetResource.AssetParams + resource.AssetHolding = assetResource.AssetHolding } - re := LoadedResourcesEntry{ + re := LoadedResourceEntry{ Resource: &resource, Address: task.address, CreatableIndex: task.creatableIndex, CreatableType: task.creatableType, } - // update all the group tasks with the new acquired balance. - for i, wt := range task.groupTasks { - wt.markCompletionResource(task.groupTasksIndices[i], re, groupDoneCh) - } + // update the group task with the new acquired balance. + task.groupTask.markCompletionResource(task.groupTaskIndex, re, groupDoneCh) } } diff --git a/ledger/eval/prefetcher/prefetcher_alignment_test.go b/ledger/eval/prefetcher/prefetcher_alignment_test.go index 7cb43ebb49..75c2aa7b56 100644 --- a/ledger/eval/prefetcher/prefetcher_alignment_test.go +++ b/ledger/eval/prefetcher/prefetcher_alignment_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/algorand/avm-abi/apps" "github.com/algorand/go-deadlock" "github.com/algorand/go-algorand/config" @@ -35,6 +36,7 @@ import ( "github.com/algorand/go-algorand/data/committee" "github.com/algorand/go-algorand/data/stateproofmsg" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/ledger/eval" "github.com/algorand/go-algorand/ledger/eval/prefetcher" "github.com/algorand/go-algorand/ledger/ledgercore" @@ -78,11 +80,13 @@ type prefetcherAlignmentTestLedger struct { apps map[basics.Address]map[basics.AppIndex]ledgercore.AppResource assets map[basics.Address]map[basics.AssetIndex]ledgercore.AssetResource creators map[basics.CreatableIndex]basics.Address + kvs map[string][]byte requestedBalances map[basics.Address]struct{} requestedApps map[basics.Address]map[basics.AppIndex]struct{} requestedAssets map[basics.Address]map[basics.AssetIndex]struct{} requestedCreators map[creatable]struct{} + requestedKvs map[string]struct{} // Protects requested* variables. mu deadlock.Mutex @@ -171,7 +175,17 @@ func (l *prefetcherAlignmentTestLedger) LookupAsset(rnd basics.Round, addr basic } func (l *prefetcherAlignmentTestLedger) LookupKv(rnd basics.Round, key string) ([]byte, error) { - panic("not implemented") + l.mu.Lock() + if l.requestedKvs == nil { + l.requestedKvs = make(map[string]struct{}) + } + l.requestedKvs[key] = struct{}{} + l.mu.Unlock() + + if value, has := l.kvs[key]; has { + return value, nil + } + return nil, nil } func (l *prefetcherAlignmentTestLedger) GetCreatorForRound(_ basics.Round, cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) { @@ -214,7 +228,19 @@ func parseLoadedAccountDataEntries(loadedAccountDataEntries []prefetcher.LoadedA return res } -func parseLoadedResourcesEntries(loadedResourcesEntries []prefetcher.LoadedResourcesEntry) (apps map[basics.Address]map[basics.AppIndex]struct{}, assets map[basics.Address]map[basics.AssetIndex]struct{}, creators map[creatable]struct{}) { +func parseLoadedKVEntries(loadedKVEntries []prefetcher.LoadedKVEntry) map[string]struct{} { + if len(loadedKVEntries) == 0 { + return nil + } + + res := make(map[string]struct{}) + for _, e := range loadedKVEntries { + res[e.Key] = struct{}{} + } + return res +} + +func parseLoadedResourcesEntries(loadedResourcesEntries []prefetcher.LoadedResourceEntry) (apps map[basics.Address]map[basics.AppIndex]struct{}, assets map[basics.Address]map[basics.AssetIndex]struct{}, creators map[creatable]struct{}) { for _, e := range loadedResourcesEntries { cr := creatable{ cindex: e.CreatableIndex, @@ -283,11 +309,12 @@ type ledgerData struct { Apps map[basics.Address]map[basics.AppIndex]struct{} Assets map[basics.Address]map[basics.AssetIndex]struct{} Creators map[creatable]struct{} + KVs map[string]struct{} } // pretend adds the `before` addresses to the Accounts. It "pretends" that the // addresses were prefetched, so we can get agreement with what was actually -// requested. We do this to include two addresses that are going to end up +// requested. We do this to include the rewards pool which is going to end up // requested *before* prefetch is even attempted. So there's no point in // PrefetchAccounts being modified to return them, they have been "prefetched" // simply by accessing them. @@ -300,7 +327,7 @@ func (ld *ledgerData) pretend(before ...basics.Address) { func prefetch(t *testing.T, l prefetcher.Ledger, txn transactions.Transaction) ledgerData { group := makeGroupFromTxn(txn) - ch := prefetcher.PrefetchAccounts( + ch := prefetcher.BlockReferences( context.Background(), l, 1, [][]transactions.SignedTxnWithAD{group}, feeSink(), config.Consensus[proto]) @@ -315,12 +342,14 @@ func prefetch(t *testing.T, l prefetcher.Ledger, txn transactions.Transaction) l accounts := parseLoadedAccountDataEntries(loaded.Accounts) apps, assets, creators := parseLoadedResourcesEntries(loaded.Resources) + kvs := parseLoadedKVEntries(loaded.KVs) return ledgerData{ Accounts: accounts, Apps: apps, Assets: assets, Creators: creators, + KVs: kvs, } } @@ -343,6 +372,7 @@ func run(t *testing.T, l *prefetcherAlignmentTestLedger, txn transactions.Transa l.requestedApps = nil l.requestedAssets = nil l.requestedCreators = nil + l.requestedKvs = nil runEval(t, l, txn) requestedData := ledgerData{ @@ -350,6 +380,7 @@ func run(t *testing.T, l *prefetcherAlignmentTestLedger, txn transactions.Transa Apps: l.requestedApps, Assets: l.requestedAssets, Creators: l.requestedCreators, + KVs: l.requestedKvs, } return requestedData, prefetched @@ -784,7 +815,7 @@ func TestEvaluatorPrefetcherAlignmentAssetClawback(t *testing.T) { func TestEvaluatorPrefetcherAlignmentAssetFreeze(t *testing.T) { partitiontest.PartitionTest(t) - assetID := basics.AssetIndex(5) + const assetID = 5 l := &prefetcherAlignmentTestLedger{ balances: map[basics.Address]ledgercore.AccountData{ rewardsPool(): { @@ -828,12 +859,12 @@ func TestEvaluatorPrefetcherAlignmentAssetFreeze(t *testing.T) { }, }, creators: map[basics.CreatableIndex]basics.Address{ - basics.CreatableIndex(assetID): makeAddress(1), + assetID: makeAddress(1), }, } txn := transactions.Transaction{ - Type: protocol.AssetTransferTx, + Type: protocol.AssetFreezeTx, Header: transactions.Header{ Sender: makeAddress(2), GenesisHash: genesisHash(), @@ -1250,7 +1281,19 @@ func TestEvaluatorPrefetcherAlignmentApplicationCallAccountsDeclaration(t *testi func TestEvaluatorPrefetcherAlignmentApplicationCallForeignAppsDeclaration(t *testing.T) { partitiontest.PartitionTest(t) - appID := basics.AppIndex(5) + const appID = 1115 + + // We're going to access app1 and app2 to match the prefetcher's assumption + // that it should prefetch apps + ops, err := logic.AssembleString(`#pragma version 5 +int 1; byte "A" +app_global_get_ex; pop; pop +int 2; byte "A" +app_global_get_ex; pop; pop +int 1`) + require.NoError(t, err) + getGlobals := ops.Program + l := &prefetcherAlignmentTestLedger{ balances: map[basics.Address]ledgercore.AccountData{ rewardsPool(): { @@ -1276,7 +1319,21 @@ func TestEvaluatorPrefetcherAlignmentApplicationCallForeignAppsDeclaration(t *te makeAddress(1): { appID: { AppParams: &basics.AppParams{ - ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ApprovalProgram: getGlobals, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + 1116: { + AppParams: &basics.AppParams{ + ApprovalProgram: getGlobals, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + 1118: { + AppParams: &basics.AppParams{ + ApprovalProgram: getGlobals, ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, }, AppLocalState: &basics.AppLocalState{}, @@ -1289,7 +1346,9 @@ func TestEvaluatorPrefetcherAlignmentApplicationCallForeignAppsDeclaration(t *te }, }, creators: map[basics.CreatableIndex]basics.Address{ - basics.CreatableIndex(appID): makeAddress(1), + appID: makeAddress(1), + 1116: makeAddress(1), + 1118: makeAddress(1), }, } @@ -1301,17 +1360,17 @@ func TestEvaluatorPrefetcherAlignmentApplicationCallForeignAppsDeclaration(t *te }, ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ ApplicationID: appID, - ForeignApps: []basics.AppIndex{6, 8}, + ForeignApps: []basics.AppIndex{1116, 1118}, }, } requested, prefetched := run(t, l, txn) prefetched.pretend(rewardsPool()) - // Foreign apps are not loaded, ensure they are not prefetched - require.NotContains(t, prefetched.Creators, creatable{cindex: 6, ctype: basics.AppCreatable}) - require.NotContains(t, prefetched.Creators, creatable{cindex: 8, ctype: basics.AppCreatable}) - require.Equal(t, requested, prefetched) + // Foreign apps are loaded, ensure they are prefetched + require.Contains(t, prefetched.Creators, creatable{cindex: 1116, ctype: basics.AppCreatable}) + require.Contains(t, prefetched.Creators, creatable{cindex: 1118, ctype: basics.AppCreatable}) + require.Equal(t, requested, prefetched) // Need to make the byte code use those app globals } func TestEvaluatorPrefetcherAlignmentApplicationCallForeignAssetsDeclaration(t *testing.T) { @@ -1478,3 +1537,367 @@ func TestEvaluatorPrefetcherAlignmentHeartbeat(t *testing.T) { prefetched.pretend(rewardsPool()) require.Equal(t, requested, prefetched) } + +func TestEvaluatorPrefetcherAlignmentApplicationCallWithBoxReference(t *testing.T) { + partitiontest.PartitionTest(t) + + appID := basics.AppIndex(5) + boxName := []byte("test-box") + boxKey := apps.MakeBoxKey(uint64(appID), string(boxName)) + boxValue := []byte("test-value") + + l := &prefetcherAlignmentTestLedger{ + balances: map[basics.Address]ledgercore.AccountData{ + rewardsPool(): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1234567890}, + }, + }, + makeAddress(1): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000001}, + TotalAppParams: 1, + TotalAppLocalStates: 1, + }, + }, + makeAddress(2): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000002}, + TotalAppLocalStates: 1, + }, + }, + }, + apps: map[basics.Address]map[basics.AppIndex]ledgercore.AppResource{ + makeAddress(1): { + appID: { + AppParams: &basics.AppParams{ + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + }, + makeAddress(2): { + appID: { + AppLocalState: &basics.AppLocalState{}, + }, + }, + }, + creators: map[basics.CreatableIndex]basics.Address{ + basics.CreatableIndex(appID): makeAddress(1), + }, + kvs: map[string][]byte{ + boxKey: boxValue, + }, + } + + txn := transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(2), + GenesisHash: genesisHash(), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: appID, + Boxes: []transactions.BoxRef{ + { + Index: 0, // refers to the ApplicationID + Name: boxName, + }, + }, + }, + } + + requested, prefetched := run(t, l, txn) + + prefetched.pretend(rewardsPool()) + require.Equal(t, requested, prefetched) +} + +func TestEvaluatorPrefetcherAlignmentApplicationCallWithMultipleBoxes(t *testing.T) { + partitiontest.PartitionTest(t) + + appID := basics.AppIndex(5) + box1Name := []byte("box1") + box2Name := []byte("box2") + box1Key := apps.MakeBoxKey(uint64(appID), string(box1Name)) + box2Key := apps.MakeBoxKey(uint64(appID), string(box2Name)) + box1Value := []byte("value1") + box2Value := []byte("value2") + + l := &prefetcherAlignmentTestLedger{ + balances: map[basics.Address]ledgercore.AccountData{ + rewardsPool(): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1234567890}, + }, + }, + makeAddress(1): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000001}, + TotalAppParams: 1, + TotalAppLocalStates: 1, + }, + }, + makeAddress(2): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000002}, + TotalAppLocalStates: 1, + }, + }, + }, + apps: map[basics.Address]map[basics.AppIndex]ledgercore.AppResource{ + makeAddress(1): { + appID: { + AppParams: &basics.AppParams{ + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + }, + makeAddress(2): { + appID: { + AppLocalState: &basics.AppLocalState{}, + }, + }, + }, + creators: map[basics.CreatableIndex]basics.Address{ + basics.CreatableIndex(appID): makeAddress(1), + }, + kvs: map[string][]byte{ + box1Key: box1Value, + box2Key: box2Value, + }, + } + + txn := transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(2), + GenesisHash: genesisHash(), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: appID, + Boxes: []transactions.BoxRef{ + { + Index: 0, + Name: box1Name, + }, + { + Index: 0, + Name: box2Name, + }, + }, + }, + } + + requested, prefetched := run(t, l, txn) + + prefetched.pretend(rewardsPool()) + require.Equal(t, requested, prefetched) +} + +func TestEvaluatorPrefetcherAlignmentApplicationCallWithNonExistentBox(t *testing.T) { + partitiontest.PartitionTest(t) + + appID := basics.AppIndex(5) + boxName := []byte("nonexistent-box") + + l := &prefetcherAlignmentTestLedger{ + balances: map[basics.Address]ledgercore.AccountData{ + rewardsPool(): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1234567890}, + }, + }, + makeAddress(1): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000001}, + TotalAppParams: 1, + TotalAppLocalStates: 1, + }, + }, + makeAddress(2): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000002}, + TotalAppLocalStates: 1, + }, + }, + }, + apps: map[basics.Address]map[basics.AppIndex]ledgercore.AppResource{ + makeAddress(1): { + appID: { + AppParams: &basics.AppParams{ + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + }, + makeAddress(2): { + appID: { + AppLocalState: &basics.AppLocalState{}, + }, + }, + }, + creators: map[basics.CreatableIndex]basics.Address{ + basics.CreatableIndex(appID): makeAddress(1), + }, + kvs: map[string][]byte{ + // No box data - testing non-existent box + }, + } + + txn := transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(2), + GenesisHash: genesisHash(), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: appID, + Boxes: []transactions.BoxRef{ + { + Index: 0, + Name: boxName, + }, + }, + }, + } + + requested, prefetched := run(t, l, txn) + + prefetched.pretend(rewardsPool()) + require.Equal(t, requested, prefetched) +} + +func TestEvaluatorPrefetcherAlignmentApplicationCallWithForeignAppBox(t *testing.T) { + partitiontest.PartitionTest(t) + + const appID = 1115 + const foreignAppID = 1110 + boxName := []byte("foreign-app-box") + boxKey := apps.MakeBoxKey(foreignAppID, string(boxName)) + boxValue := []byte("foreign-value") + + // We're going to access app1's globals to match our prefetcher's expectation that apps get accessed. + ops, err := logic.AssembleString(`#pragma version 10 +int 1 +byte "A" +app_global_get_ex +pop +pop +int 1`) + require.NoError(t, err) + globalApp1 := ops.Program + + l := &prefetcherAlignmentTestLedger{ + balances: map[basics.Address]ledgercore.AccountData{ + rewardsPool(): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1234567890}, + }, + }, + makeAddress(1): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000001}, + TotalAppParams: 1, + TotalAppLocalStates: 1, + }, + }, + makeAddress(2): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000002}, + TotalAppLocalStates: 1, + }, + }, + makeAddress(3): { + AccountBaseData: ledgercore.AccountBaseData{ + MicroAlgos: basics.MicroAlgos{Raw: 1000003}, + TotalAppParams: 1, + TotalAppLocalStates: 1, + }, + }, + }, + apps: map[basics.Address]map[basics.AppIndex]ledgercore.AppResource{ + makeAddress(1): { + appID: { + AppParams: &basics.AppParams{ + ApprovalProgram: globalApp1, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + }, + makeAddress(2): { + appID: { + AppLocalState: &basics.AppLocalState{}, + }, + }, + makeAddress(3): { + foreignAppID: { + AppParams: &basics.AppParams{ + ApprovalProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + ClearStateProgram: []byte{0x02, 0x20, 0x01, 0x01, 0x22}, + }, + AppLocalState: &basics.AppLocalState{}, + }, + }, + }, + creators: map[basics.CreatableIndex]basics.Address{ + appID: makeAddress(1), + foreignAppID: makeAddress(3), + }, + kvs: map[string][]byte{ + boxKey: boxValue, + }, + } + + txn := transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(2), + GenesisHash: genesisHash(), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: appID, + ForeignApps: []basics.AppIndex{foreignAppID}, + Boxes: []transactions.BoxRef{ + { + Index: 1, // refers to ForeignApps[0] + Name: boxName, + }, + }, + }, + } + + requested, prefetched := run(t, l, txn) + prefetched.pretend(rewardsPool()) + require.Equal(t, requested, prefetched) + + txnWithAccess := transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(2), + GenesisHash: genesisHash(), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: appID, + Access: []transactions.ResourceRef{ + {App: foreignAppID}, + {Box: transactions.BoxRef{ + Index: 1, + Name: boxName, + }}, + }, + }, + } + + requestedWithA, prefetchedWithA := run(t, l, txnWithAccess) + prefetchedWithA.pretend(rewardsPool()) + require.Equal(t, requestedWithA, prefetchedWithA) + + // fetches for the txn with Access should match + require.Equal(t, requestedWithA, requested) +} diff --git a/ledger/eval/prefetcher/prefetcher_test.go b/ledger/eval/prefetcher/prefetcher_test.go index cd7c7f27b2..f8e3fda994 100644 --- a/ledger/eval/prefetcher/prefetcher_test.go +++ b/ledger/eval/prefetcher/prefetcher_test.go @@ -18,6 +18,7 @@ package prefetcher_test import ( "context" + "errors" "testing" "github.com/stretchr/testify/require" @@ -33,18 +34,16 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" ) -func makeAddressPtr(seed int) (o *basics.Address) { - o = new(basics.Address) +func makeAddressPtr(seed int) *basics.Address { + o := new(basics.Address) o[0] = byte(seed) o[1] = byte(seed >> 8) o[2] = byte(seed >> 16) - return + return o } -func makeAddress(addressSeed int) (o basics.Address) { - t := *makeAddressPtr(addressSeed) - copy(o[:], t[:]) - return +func makeAddress(seed int) basics.Address { + return *makeAddressPtr(seed) } // It would be nice to test current and future, but until that change is made, @@ -74,11 +73,13 @@ type prefetcherTestLedger struct { round basics.Round balances map[basics.Address]ledgercore.AccountData creators map[basics.CreatableIndex]basics.Address - errorTriggerAddress map[basics.Address]bool + kvs map[string][]byte + errorTriggerAddress *basics.Address } const errorTriggerCreatableIndex = 1000001 const errorTriggerAssetIndex = 1000002 +const errorTriggerKvName = "BADKV" func (l *prefetcherTestLedger) BlockHdr(basics.Round) (bookkeeping.BlockHeader, error) { return bookkeeping.BlockHeader{}, nil @@ -87,7 +88,7 @@ func (l *prefetcherTestLedger) CheckDup(config.ConsensusParams, basics.Round, ba return nil } func (l *prefetcherTestLedger) LookupWithoutRewards(rnd basics.Round, addr basics.Address) (ledgercore.AccountData, basics.Round, error) { - if _, has := l.errorTriggerAddress[addr]; has { + if l.errorTriggerAddress != nil && *l.errorTriggerAddress == addr { return ledgercore.AccountData{}, l.round, lookupError{} } if data, has := l.balances[addr]; has { @@ -113,6 +114,17 @@ func (l *prefetcherTestLedger) GetCreatorForRound(_ basics.Round, cidx basics.Cr } return basics.Address{}, false, nil } + +func (l *prefetcherTestLedger) LookupKv(_ basics.Round, name string) ([]byte, error) { + if name[11:] == errorTriggerKvName { + return nil, errors.New("error looking up kv") + } + if val, has := l.kvs[name]; has { + return val, nil + } + return nil, nil +} + func (l *prefetcherTestLedger) GenesisHash() crypto.Digest { return crypto.Digest{} } @@ -160,7 +172,7 @@ type loadedResourcesEntryKey struct { creatableType basics.CreatableType } -func convertLoadedResourcesEntries(entries []prefetcher.LoadedResourcesEntry) map[loadedResourcesEntryKey]*ledgercore.AccountResource { +func convertLoadedResourcesEntries(entries []prefetcher.LoadedResourceEntry) map[loadedResourcesEntryKey]*ledgercore.AccountResource { res := make(map[loadedResourcesEntryKey]*ledgercore.AccountResource) for _, e := range entries { @@ -179,7 +191,8 @@ func convertLoadedResourcesEntries(entries []prefetcher.LoadedResourcesEntry) ma return res } -func compareLoadedResourcesEntries(t *testing.T, expected []prefetcher.LoadedResourcesEntry, actual []prefetcher.LoadedResourcesEntry) { +func compareLoadedResourcesEntries(t *testing.T, expected []prefetcher.LoadedResourceEntry, actual []prefetcher.LoadedResourceEntry) { + t.Helper() expectedForTest := convertLoadedResourcesEntries(expected) actualForTest := convertLoadedResourcesEntries(actual) require.Equal(t, expectedForTest, actualForTest) @@ -188,10 +201,10 @@ func compareLoadedResourcesEntries(t *testing.T, expected []prefetcher.LoadedRes func getPrefetcherTestLedger(rnd basics.Round) *prefetcherTestLedger { var ledger = &prefetcherTestLedger{ - round: rnd, - balances: make(map[basics.Address]ledgercore.AccountData), - creators: make(map[basics.CreatableIndex]basics.Address), - errorTriggerAddress: make(map[basics.Address]bool), + round: rnd, + balances: make(map[basics.Address]ledgercore.AccountData), + creators: make(map[basics.CreatableIndex]basics.Address), + kvs: make(map[string][]byte), } ledger.balances[makeAddress(1)] = ledgercore.AccountData{ AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, @@ -214,7 +227,7 @@ func TestEvaluatorPrefetcher(t *testing.T) { skip bool signedTxn transactions.SignedTxn accounts []prefetcher.LoadedAccountDataEntry - resources []prefetcher.LoadedResourcesEntry + resources []prefetcher.LoadedResourceEntry } testCases := []testCase{ @@ -286,7 +299,7 @@ func TestEvaluatorPrefetcher(t *testing.T) { }, }, }, - resources: []prefetcher.LoadedResourcesEntry{ + resources: []prefetcher.LoadedResourceEntry{ { Address: nil, CreatableIndex: 1000, @@ -322,7 +335,7 @@ func TestEvaluatorPrefetcher(t *testing.T) { }, }, }, - resources: []prefetcher.LoadedResourcesEntry{ + resources: []prefetcher.LoadedResourceEntry{ { Address: makeAddressPtr(2), CreatableIndex: 1001, @@ -362,7 +375,7 @@ func TestEvaluatorPrefetcher(t *testing.T) { }, }, }, - resources: []prefetcher.LoadedResourcesEntry{ + resources: []prefetcher.LoadedResourceEntry{ { Address: makeAddressPtr(2), CreatableIndex: 1001, @@ -413,7 +426,7 @@ func TestEvaluatorPrefetcher(t *testing.T) { }, }, }, - resources: []prefetcher.LoadedResourcesEntry{ + resources: []prefetcher.LoadedResourceEntry{ { Address: makeAddressPtr(2), CreatableIndex: 1001, @@ -455,16 +468,10 @@ func TestEvaluatorPrefetcher(t *testing.T) { AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, }, }, - { - Address: makeAddressPtr(3), - Data: &ledgercore.AccountData{ - AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, - }, - }, }, - resources: []prefetcher.LoadedResourcesEntry{ + resources: []prefetcher.LoadedResourceEntry{ { - Address: makeAddressPtr(2), + Address: makeAddressPtr(2), // The creator of 1001 in the test ledger CreatableIndex: 1001, CreatableType: basics.AssetCreatable, Resource: &ledgercore.AccountResource{}, @@ -529,14 +536,15 @@ func TestEvaluatorPrefetcher(t *testing.T) { }, */ }, - resources: []prefetcher.LoadedResourcesEntry{ - /* - if we'll decide that we want to prefetch the foreign apps/assets, then this should be enabled + resources: []prefetcher.LoadedResourceEntry{ + /* - if we'll decide that we want to prefetch the foreign assets, then this should be enabled { Address: makeAddressPtr(2), CreatableIndex: 1001, CreatableType: basics.AssetCreatable, Resource: &ledgercore.AccountResource{}, }, + */ { Address: makeAddressPtr(15), CreatableIndex: 2001, @@ -549,7 +557,6 @@ func TestEvaluatorPrefetcher(t *testing.T) { CreatableType: basics.AppCreatable, Resource: nil, }, - */ /* - if we'll decide that we want to prefetch the account local state, then this should be enabled. { address: acctAddrPtr(1), @@ -565,6 +572,95 @@ func TestEvaluatorPrefetcher(t *testing.T) { }, }, }, + { + name: "application transaction using access", + signedTxn: transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(1), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: 10, + Access: []transactions.ResourceRef{ + {Address: makeAddress(4)}, + {Address: makeAddress(5)}, + {Address: makeAddress(6)}, + {App: 2001}, + {App: 2002}, + {Asset: 1001}, + {Asset: 1002}, + // Since we have this cross product we won't prefetch the Address or Asset in it. + {Holding: transactions.HoldingRef{ + Address: 3, + Asset: 7, + }}, + }, + }, + }, + }, + accounts: []prefetcher.LoadedAccountDataEntry{ + { + Address: &feeSinkAddr, + Data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + Address: makeAddressPtr(1), + Data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 100000000}}, + }, + }, + // 4 and 5 are here because when using Access, if accounts are + // not used in a cross product, they are assumed prefetch + // worthy. + { + Address: makeAddressPtr(4), + Data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + { + Address: makeAddressPtr(5), + Data: &ledgercore.AccountData{ + AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 0}}, + }, + }, + }, + resources: []prefetcher.LoadedResourceEntry{ + { // Since 1001 is not used in a cross product, it is prefetched + Address: makeAddressPtr(2), // 2 is the creator for 1001 + CreatableIndex: 1001, + CreatableType: basics.AssetCreatable, + Resource: &ledgercore.AccountResource{}, + }, + { + Address: makeAddressPtr(15), // 15 is the creator for 2001 + CreatableIndex: 2001, + CreatableType: basics.AppCreatable, + Resource: &ledgercore.AccountResource{}, + }, + { + Address: nil, + CreatableIndex: 2002, + CreatableType: basics.AppCreatable, + Resource: nil, + }, + { + Address: nil, + CreatableIndex: 10, + CreatableType: basics.AppCreatable, + Resource: nil, + }, + { // Fetch the explicit cross-product resource + Address: makeAddressPtr(6), + CreatableIndex: 1002, + CreatableType: basics.AssetCreatable, + Resource: &ledgercore.AccountResource{}, + }, + }, + }, } for _, testCase := range testCases { @@ -576,7 +672,7 @@ func TestEvaluatorPrefetcher(t *testing.T) { groups[0] = make([]transactions.SignedTxnWithAD, 1) groups[0][0].SignedTxn = testCase.signedTxn - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[proto]) + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[proto]) loadedTxnGroup, ok := <-preloadedTxnGroupsCh require.True(t, ok) @@ -628,18 +724,72 @@ func TestAssetLookupError(t *testing.T) { } } - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) receivedNumGroups := 0 for loadedTxnGroup := range preloadedTxnGroupsCh { receivedNumGroups++ - if loadedTxnGroup.Err != nil { + if err := loadedTxnGroup.Err; err != nil { errorReceived = true - require.Equal(t, int64(2), loadedTxnGroup.Err.GroupIdx) - require.ErrorIs(t, loadedTxnGroup.Err, assetLookupError{}) - require.Equal(t, makeAddress(2), *loadedTxnGroup.Err.Address) - require.Equal(t, errorTriggerAssetIndex, int(loadedTxnGroup.Err.CreatableIndex)) - require.Equal(t, basics.AssetCreatable, loadedTxnGroup.Err.CreatableType) + require.ErrorContains(t, err, "prefetch failed for groupIdx 2, address: AIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGFFWAF4, creatableIndex 1000002, creatableType 0, cause: asset lookup error") + } + require.Equal(t, txnPerGroup, len(loadedTxnGroup.TxnGroup)) + } + require.True(t, errorReceived) + require.Equal(t, numGroups, receivedNumGroups) +} + +// Test for error from LookupKV +func TestBoxLookupError(t *testing.T) { + partitiontest.PartitionTest(t) + + const rnd = 5 + var feeSinkAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + ledger := getPrefetcherTestLedger(rnd) + appTxn := + transactions.SignedTxn{ + Txn: transactions.Transaction{ + Type: protocol.ApplicationCallTx, + Header: transactions.Header{ + Sender: makeAddress(1), + }, + ApplicationCallTxnFields: transactions.ApplicationCallTxnFields{ + ApplicationID: 1002, + ForeignApps: []basics.AppIndex{2002}, + Boxes: []transactions.BoxRef{{Index: 1, Name: []byte("GOODKV")}}, + }, + }, + } + + errorReceived := false + const numGroups = 5 + const txnPerGroup = 2 + groups := make([][]transactions.SignedTxnWithAD, numGroups) + for i := 0; i < numGroups; i++ { + groups[i] = make([]transactions.SignedTxnWithAD, txnPerGroup) + for j := 0; j < txnPerGroup; j++ { + groups[i][j].SignedTxn = appTxn + if i == 2 { + // force error in asset lookup in the second txn group only + // (need to asssigned the entire Boxes slice to avoid modifying + // other txns) + groups[i][j].SignedTxn.Txn.ApplicationCallTxnFields.Boxes = []transactions.BoxRef{{ + Index: 1, + Name: []byte("BADKV"), + }} + } + } + } + + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) + + receivedNumGroups := 0 + for loadedTxnGroup := range preloadedTxnGroupsCh { + receivedNumGroups++ + if err := loadedTxnGroup.Err; err != nil { + errorReceived = true + require.ErrorContains(t, err, "prefetch failed for groupIdx 2, kv:") + require.ErrorContains(t, err, "cause: error looking up kv") } require.Equal(t, txnPerGroup, len(loadedTxnGroup.TxnGroup)) } @@ -685,17 +835,14 @@ func TestGetCreatorForRoundError(t *testing.T) { } } } - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) receivedNumGroups := 0 for loadedTxnGroup := range preloadedTxnGroupsCh { receivedNumGroups++ - if loadedTxnGroup.Err != nil { + if err := loadedTxnGroup.Err; err != nil { errorReceived = true - require.ErrorIs(t, loadedTxnGroup.Err, getCreatorError{}) - require.Nil(t, loadedTxnGroup.Err.Address) - require.Equal(t, errorTriggerCreatableIndex, int(loadedTxnGroup.Err.CreatableIndex)) - require.Equal(t, basics.AssetCreatable, loadedTxnGroup.Err.CreatableType) + require.ErrorContains(t, err, "prefetch failed for groupIdx 0, address: , creatableIndex 1000001, creatableType 0, cause: get creator error") } require.Equal(t, txnPerGroup, len(loadedTxnGroup.TxnGroup)) } @@ -741,18 +888,15 @@ func TestLookupWithoutRewards(t *testing.T) { } } } - ledger.errorTriggerAddress[createAssetFailedTxn.Txn.Sender] = true - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) + ledger.errorTriggerAddress = &createAssetFailedTxn.Txn.Sender + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd+100, groups, feeSinkAddr, config.Consensus[proto]) receivedNumGroups := 0 for loadedTxnGroup := range preloadedTxnGroupsCh { receivedNumGroups++ - if loadedTxnGroup.Err != nil { + if err := loadedTxnGroup.Err; err != nil { errorReceived = true - require.ErrorIs(t, loadedTxnGroup.Err, lookupError{}) - require.Equal(t, makeAddress(10), *loadedTxnGroup.Err.Address) - require.Equal(t, 0, int(loadedTxnGroup.Err.CreatableIndex)) - require.Equal(t, basics.AssetCreatable, loadedTxnGroup.Err.CreatableType) + require.ErrorContains(t, err, "prefetch failed for groupIdx 0, address: BIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAYVWV3M, creatableIndex 0, creatableType 0, cause: lookup error") } require.Equal(t, txnPerGroup, len(loadedTxnGroup.TxnGroup)) } @@ -777,7 +921,7 @@ func TestEvaluatorPrefetcherQueueExpansion(t *testing.T) { type testTransactionCases struct { signedTxn transactions.SignedTxn accounts []prefetcher.LoadedAccountDataEntry - resources []prefetcher.LoadedResourcesEntry + resources []prefetcher.LoadedResourceEntry } txnGroups := make([][]transactions.SignedTxnWithAD, 20000) @@ -800,7 +944,7 @@ func TestEvaluatorPrefetcherQueueExpansion(t *testing.T) { addr += 2 } } - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd, txnGroups, feeSinkAddr, config.Consensus[proto]) + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd, txnGroups, feeSinkAddr, config.Consensus[proto]) groupsCount := 0 addressCount := 0 uniqueAccounts := make(map[basics.Address]bool) @@ -814,8 +958,13 @@ func TestEvaluatorPrefetcherQueueExpansion(t *testing.T) { } require.Equal(t, len(txnGroups), groupsCount) // the +1 below is for the fee sink address. - require.Equal(t, len(txnGroups)*16*3+1, addressCount) require.Equal(t, len(txnGroups)*16*2+1, len(uniqueAccounts)) + // We no longer bother to return all of the addresses used in a group (which + // would be 3 per transaction). Callers don't care about receiving every + // address/resource, they care about receiving any _new_ addr/resource that + // wasn't in a previous transaction group. Previous ones were returned + // earlier and put in their cache. + require.Equal(t, len(uniqueAccounts), addressCount) } func BenchmarkPrefetcherApps(b *testing.B) { @@ -834,15 +983,19 @@ func BenchmarkPrefetcherApps(b *testing.B) { ApplicationID: 10, Accounts: []basics.Address{ makeAddress(grpIdx + txnIdx + 1), - makeAddress(grpIdx + txnIdx + 1), + makeAddress(grpIdx + txnIdx + 2), }, ForeignApps: []basics.AppIndex{ 2001, - 2002, + 2002 + basics.AppIndex(txnIdx), }, ForeignAssets: []basics.AssetIndex{ - 1001, + 1001 + basics.AssetIndex(txnIdx), }, + Boxes: []transactions.BoxRef{{ + Index: 1, + Name: []byte("some name"), + }}, }, }, } @@ -860,7 +1013,7 @@ func BenchmarkPrefetcherApps(b *testing.B) { } b.ResetTimer() - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[proto]) + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[proto]) for k := range preloadedTxnGroupsCh { require.NoError(b, k.Err) } @@ -898,7 +1051,7 @@ func BenchmarkPrefetcherPayment(b *testing.B) { } b.ResetTimer() - preloadedTxnGroupsCh := prefetcher.PrefetchAccounts(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[proto]) + preloadedTxnGroupsCh := prefetcher.BlockReferences(context.Background(), ledger, rnd, groups, feeSinkAddr, config.Consensus[proto]) for k := range preloadedTxnGroupsCh { require.NoError(b, k.Err) } diff --git a/ledger/lruaccts.go b/ledger/lruaccts.go index c344235515..8b49ae8f80 100644 --- a/ledger/lruaccts.go +++ b/ledger/lruaccts.go @@ -166,7 +166,6 @@ func (m *lruAccounts) prune(newSize int) (removed int) { removed++ } - // clear the notFound list - m.notFound = make(map[basics.Address]struct{}, len(m.notFound)) + clear(m.notFound) return } diff --git a/ledger/lruresources.go b/ledger/lruresources.go index 1623b54c79..97070391b7 100644 --- a/ledger/lruresources.go +++ b/ledger/lruresources.go @@ -186,7 +186,6 @@ func (m *lruResources) prune(newSize int) (removed int) { removed++ } - // clear the notFound list - m.notFound = make(map[accountCreatable]struct{}, len(m.notFound)) + clear(m.notFound) return } diff --git a/util/metrics/reporter.go b/util/metrics/reporter.go index 65423691e0..851d54918f 100644 --- a/util/metrics/reporter.go +++ b/util/metrics/reporter.go @@ -26,8 +26,6 @@ import ( "regexp" "strings" "time" - // logging imports metrics so that we can have metrics about logging, which is more important than the four Debug lines we had here logging about metrics. TODO: find a more clever cycle resolution - //"github.com/algorand/go-algorand/logging" ) const ( diff --git a/util/pagedqueue.go b/util/pagedqueue.go new file mode 100644 index 0000000000..51c31f387e --- /dev/null +++ b/util/pagedqueue.go @@ -0,0 +1,166 @@ +// Copyright (C) 2019-2026 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package util + +import "iter" + +// PagedQueue is a linked list of "pages" of entries. It is unsynchronized, so +// `append` must be used single threaded, or externally synchronized. Access +// with `get` can be from multiple threads, after all appends have been +// completed. These are roughly the same rules that a growing slice would have, +// but PagedQueue avoids copying to grow by using a paged approach. Random +// access would pay a penalty to access entries by following the links, but +// processing in FIFO order has negligible extra cost. +// +// The zero value is a valid empty queue; NewPagedQueue is only needed to +// pre-allocate capacity for the first page. +type PagedQueue[T any] struct { + next *PagedQueue[T] + entries []T + baseIdx int +} + +// NewPagedQueue constructs a new PagedQueue that can hold at least count items +// without allocating a new page. +func NewPagedQueue[T any](count int) *PagedQueue[T] { + count = max(count, 4) + return &PagedQueue[T]{ + entries: make([]T, 0, count), + } +} + +// Len returns the number of entries in the queue. +func (pq *PagedQueue[T]) Len() int { + if pq.next != nil { + return pq.next.Len() + } + return pq.baseIdx + len(pq.entries) +} + +// Append places v on the queue, allocating a new page if the current one is +// full, and returning the active page. +func (pq *PagedQueue[T]) Append(v T) *PagedQueue[T] { + // We are at capacity, add a page rather than allow append() to grow the + // slice and perform copies. + if len(pq.entries) == cap(pq.entries) { + if pq.entries == nil { + // We must have a zero value of PagedQueue, no need for a page, just + // allocate an initial slice. + pq.entries = make([]T, 0, 8) + } else { + pq.next = &PagedQueue[T]{ + entries: make([]T, 0, cap(pq.entries)*2), + baseIdx: pq.baseIdx + len(pq.entries), + } + pq = pq.next + } + } + pq.entries = append(pq.entries, v) + return pq +} + +// Get returns an entry at an index. Callers must have a pointer to the page +// that idx is on, or a previous page. It is most efficient to call Get with +// ascending values, constantly updating your local pointer to the returned +// PagedQueue. If idx is too low, Get() panics. If it is too high, the zero +// value is returned. The asymmetry reflects the fact that a low index is +// certainly a programmer error, a high index is a natural result of scanning +// forward. +func (pq *PagedQueue[T]) Get(idx int) (*PagedQueue[T], T) { + localIdx := idx - pq.baseIdx + if len(pq.entries) > localIdx { + return pq, pq.entries[localIdx] + } + if pq.next != nil { + return pq.next.Get(idx) + } + var zero T + return pq, zero +} + +// All returns an iterator over all entries in the queue in insertion order. +func (pq *PagedQueue[T]) All() iter.Seq[T] { + return func(yield func(T) bool) { + for page := pq; page != nil; page = page.next { + for _, t := range page.entries { + if !yield(t) { + return + } + } + } + } +} + +// All2 returns an iterator over all entries in the queue in insertion order, +// yielding the queue-wide index and value of each entry. +func (pq *PagedQueue[T]) All2() iter.Seq2[int, T] { + return func(yield func(int, T) bool) { + for page := pq; page != nil; page = page.next { + for i, t := range page.entries { + if !yield(page.baseIdx+i, t) { + return + } + } + } + } +} + +// Ptr returns a pointer to the entry at idx, along with the page it lives on +// for efficient subsequent calls. Because PagedQueue never relocates entries, +// the pointer remains valid for the lifetime of the queue. If idx is beyond +// the end, nil is returned. If idx is too low, Ptr panics. +func (pq *PagedQueue[T]) Ptr(idx int) (*PagedQueue[T], *T) { + localIdx := idx - pq.baseIdx + if localIdx < len(pq.entries) { + return pq, &pq.entries[localIdx] + } + if pq.next != nil { + return pq.next.Ptr(idx) + } + return pq, nil +} + +// AllPtrs returns an iterator over pointers to all entries in the queue in +// insertion order. Because PagedQueue never relocates entries, the pointers +// remain valid for the lifetime of the queue. +func (pq *PagedQueue[T]) AllPtrs() iter.Seq[*T] { + return func(yield func(*T) bool) { + for page := pq; page != nil; page = page.next { + for i := range page.entries { + if !yield(&page.entries[i]) { + return + } + } + } + } +} + +// AllPtrs2 returns an iterator over all entries in the queue in insertion +// order, yielding the queue-wide index and a pointer to each entry. Because +// PagedQueue never relocates entries, the pointers remain valid for the +// lifetime of the queue. +func (pq *PagedQueue[T]) AllPtrs2() iter.Seq2[int, *T] { + return func(yield func(int, *T) bool) { + for page := pq; page != nil; page = page.next { + for i := range page.entries { + if !yield(page.baseIdx+i, &page.entries[i]) { + return + } + } + } + } +} diff --git a/util/pagedqueue_test.go b/util/pagedqueue_test.go new file mode 100644 index 0000000000..7e92dfaa12 --- /dev/null +++ b/util/pagedqueue_test.go @@ -0,0 +1,360 @@ +// Copyright (C) 2019-2026 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/test/partitiontest" +) + +// TestPagedQueueNew checks that the constructor enforces a minimum page size of 4. +func TestPagedQueueNew(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + pq := NewPagedQueue[int](10) + require.NotNil(t, pq) + require.Zero(t, pq.Len()) + require.Equal(t, 10, cap(pq.entries)) + + // sizes below 4 are promoted to 4 + small := NewPagedQueue[int](2) + require.Equal(t, 4, cap(small.entries)) + require.Empty(t, small.entries) +} + +// TestPagedQueueAppendAndLen checks that Len tracks the count correctly as items are appended across pages. +func TestPagedQueueAppendAndLen(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + pq := NewPagedQueue[int](4) + for i := 0; i < 10; i++ { + pq = pq.Append(i) + require.Equal(t, i+1, pq.Len()) + } +} + +// TestPagedQueueGet checks retrieval by index starting from the head page. +func TestPagedQueueGet(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 10; i++ { + cur = cur.Append(i) + } + + // retrieve every element using the head page as the starting point + for i := 0; i < 10; i++ { + page, val := head.Get(i) + require.NotNil(t, page) + require.Equal(t, i, val) + } +} + +// TestPagedQueueGetAdvancingPage verifies that callers can advance their page pointer to avoid re-traversal from the head. +func TestPagedQueueGetAdvancingPage(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + // Demonstrate that callers can advance their page pointer to avoid + // traversing from the head on every Get. + head := NewPagedQueue[string](4) + cur := head + words := []string{"a", "b", "c", "d", "e", "f", "g", "h"} + for _, w := range words { + cur = cur.Append(w) + } + + page := head + for i, want := range words { + var got string + page, got = page.Get(i) + require.Equal(t, want, got) + } +} + +// TestPagedQueueGetZeroValueBeyondEnd checks that an index past the last entry returns the zero value. +func TestPagedQueueGetZeroValueBeyondEnd(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 3; i++ { + cur = cur.Append(i + 1) + } + + // index past the end returns the zero value + _, val := head.Get(100) + require.Equal(t, 0, val) +} + +// TestPagedQueueGetPanicOnLowIndex checks that asking a non-head page for an index that belongs to an earlier page panics. +func TestPagedQueueGetPanicOnLowIndex(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + // Fill two pages so the second page has baseIdx > 0. + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 8; i++ { + cur = cur.Append(i) + } + + // Asking a non-head page for an index that belongs to an earlier page panics + // because localIdx goes negative and entries[-n] is out of bounds. + require.Panics(t, func() { + cur.Get(0) + }) +} + +// TestPagedQueuePtr checks that Ptr returns a stable pointer to the correct entry across page boundaries. +func TestPagedQueuePtr(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 10; i++ { + cur = cur.Append(i) + } + + // collect pointers to all entries, then mutate through them + ptrs := make([]*int, 10) + page := head + for i := range 10 { + page, ptrs[i] = page.Ptr(i) + require.NotNil(t, ptrs[i]) + require.Equal(t, i, *ptrs[i]) + } + + for i, p := range ptrs { + *p = i * 100 + } + + // verify mutations are visible via Get + for i := range 10 { + _, val := head.Get(i) + require.Equal(t, i*100, val) + } +} + +// TestPagedQueuePtrBeyondEnd checks that Ptr returns nil for an out-of-bounds index. +func TestPagedQueuePtrBeyondEnd(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 3; i++ { + cur = cur.Append(i) + } + + _, p := head.Ptr(100) + require.Nil(t, p) +} + +// TestPagedQueuePtrPanicOnLowIndex checks that Ptr panics when the index is below the page's baseIdx. +func TestPagedQueuePtrPanicOnLowIndex(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 8; i++ { + cur = cur.Append(i) + } + + require.Panics(t, func() { cur.Ptr(0) }) +} + +// TestPagedQueueAllPtrs checks that AllPtrs yields stable pointers in insertion order. +func TestPagedQueueAllPtrs(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + const n = 15 + for i := 0; i < n; i++ { + cur = cur.Append(i) + } + + // collect pointers then mutate through them + var ptrs []*int + for p := range head.AllPtrs() { + ptrs = append(ptrs, p) + } + require.Len(t, ptrs, n) + for i, p := range ptrs { + *p = i * 10 + } + for idx, v := range head.All2() { + require.Equal(t, idx*10, v) + } +} + +// TestPagedQueueAllPtrs2 checks that AllPtrs2 yields correct indices and stable pointers across page boundaries. +func TestPagedQueueAllPtrs2(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + const n = 15 + for i := 0; i < n; i++ { + cur = cur.Append(i) + } + + for idx, p := range head.AllPtrs2() { + require.Equal(t, idx, *p) + *p = idx * 10 + } + for idx, v := range head.All2() { + require.Equal(t, idx*10, v) + } +} + +// TestPagedQueueAll checks that the iterator yields all entries in insertion order. +func TestPagedQueueAll(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + const n = 15 + for i := 0; i < n; i++ { + cur = cur.Append(i) + } + + var collected []int + for v := range head.All() { + collected = append(collected, v) + } + require.Len(t, collected, n) + for i := 0; i < n; i++ { + require.Equal(t, i, collected[i]) + } +} + +// TestPagedQueueAllEarlyReturn checks that breaking early from All() works without panicking. +func TestPagedQueueAllEarlyReturn(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 12; i++ { + cur = cur.Append(i) + } + + // stopping early should not panic and should return only the first few items + var collected []int + for v := range head.All() { + collected = append(collected, v) + if len(collected) == 5 { + break + } + } + require.Len(t, collected, 5) + for i := 0; i < 5; i++ { + require.Equal(t, i, collected[i]) + } +} + +// TestPagedQueueAll2 checks that All2 yields correct queue-wide indices and values across page boundaries. +func TestPagedQueueAll2(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + const n = 15 + for i := 0; i < n; i++ { + cur = cur.Append(i * 10) + } + + for idx, v := range head.All2() { + require.Equal(t, idx*10, v) + } +} + +// TestPagedQueueAll2EarlyReturn checks that breaking early from All2() works without panicking. +func TestPagedQueueAll2EarlyReturn(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 12; i++ { + cur = cur.Append(i) + } + + var collected []int + for idx, v := range head.All2() { + require.Equal(t, idx, v) + collected = append(collected, v) + if len(collected) == 5 { + break + } + } + require.Len(t, collected, 5) +} + +// TestPagedQueuePageGrowth checks that pages double in size and all values survive across many page boundaries. +func TestPagedQueuePageGrowth(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + // Each new page doubles in size relative to the previous one. + head := NewPagedQueue[int](4) + cur := head + for i := 0; i < 100; i++ { + cur = cur.Append(i) + } + + require.Equal(t, 100, head.Len()) + + // All values must be retrievable in order. + var collected []int + for v := range head.All() { + collected = append(collected, v) + } + require.Len(t, collected, 100) + for i, v := range collected { + require.Equal(t, i, v) + } +} + +// TestPagedQueueEmptyAll checks that iterating an empty queue produces no values. +func TestPagedQueueEmptyAll(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + head := NewPagedQueue[int](8) + var count int + for range head.All() { + count++ + } + require.Equal(t, 0, count) +}