diff --git a/logging/telemetryspec/event.go b/logging/telemetryspec/event.go
index a212037500..34d161bce5 100644
--- a/logging/telemetryspec/event.go
+++ b/logging/telemetryspec/event.go
@@ -99,17 +99,6 @@ type BlockAcceptedEventDetails struct {
VoteBufLen uint64
}
-// TopAccountsEvent event
-const TopAccountsEvent Event = "TopAccounts"
-
-// TopAccountEventDetails contains details for the BlockAcceptedEvent
-type TopAccountEventDetails struct {
- Round uint64
- OnlineAccounts []map[string]interface{}
- OnlineCirculation uint64
- OfflineCirculation uint64
-}
-
// AccountRegisteredEvent event
const AccountRegisteredEvent Event = "AccountRegistered"
diff --git a/node/node.go b/node/node.go
index 45e1e20ab1..c4275a6e6f 100644
--- a/node/node.go
+++ b/node/node.go
@@ -198,8 +198,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
p2pNode.SetPrioScheme(node)
node.net = p2pNode
- accountListener := makeTopAccountListener(log)
-
// load stored data
genesisDir := filepath.Join(rootDir, genesis.ID())
ledgerPathnamePrefix := filepath.Join(genesisDir, config.LedgerFilenamePrefix)
@@ -232,9 +230,6 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
node,
}
- if node.config.EnableTopAccountsReporting {
- blockListeners = append(blockListeners, &accountListener)
- }
node.ledger.RegisterBlockListeners(blockListeners)
txHandlerOpts := data.TxHandlerOpts{
TxPool: node.transactionPool,
diff --git a/node/topAccountListener.go b/node/topAccountListener.go
deleted file mode 100644
index 4b1e50ac69..0000000000
--- a/node/topAccountListener.go
+++ /dev/null
@@ -1,212 +0,0 @@
-// Copyright (C) 2019-2023 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see .
-
-package node
-
-import (
- "sort"
-
- "github.com/algorand/go-algorand/data/basics"
- "github.com/algorand/go-algorand/data/bookkeeping"
- "github.com/algorand/go-algorand/ledger/ledgercore"
- "github.com/algorand/go-algorand/logging"
- "github.com/algorand/go-algorand/logging/telemetryspec"
- "github.com/algorand/go-algorand/protocol"
-)
-
-const numTopAccounts = 20
-
-type topAccountListener struct {
- log logging.Logger
-
- round basics.Round
-
- onlineCirculation basics.MicroAlgos
-
- totalCirculation basics.MicroAlgos
-
- // Cached between rounds to optimize ledger lookups.
- accounts []basics.AccountDetail
-}
-
-func makeTopAccountListener(log logging.Logger) topAccountListener {
- return topAccountListener{
- log: log,
- // TODO: If needed, increase size of this slice to buffer some accounts beyond the TopN.
- accounts: make([]basics.AccountDetail, 0, numTopAccounts),
- }
-}
-
-func (t *topAccountListener) init(balances basics.BalanceDetail) {
- t.round = balances.Round
- t.onlineCirculation = balances.OnlineMoney
- t.totalCirculation = balances.TotalMoney
- t.accounts = t.accounts[:0]
-
- // TODO: After ledger refactor this might be replaced with a loop processing pages of results from a SQL command.
- t.accounts = updateTopAccounts(t.accounts, balances.Accounts)
-}
-
-// BlockListener event, triggered when the ledger writes a new block.
-func (t *topAccountListener) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) {
- // XXX revise for new ledger API
- // t.update(block, balances)
-
- // If number of accounts after update is insufficient, do a full re-init
- if len(t.accounts) < numTopAccounts {
- // XXX revise for new ledger API
- // t.init(balances)
- }
-
- t.sendEvent()
-}
-
-// Account cache update logic here.
-func (t *topAccountListener) update(b bookkeeping.Block, balances basics.BalanceDetail) {
- lastRound := t.round
-
- // Update metadata.
- t.round = balances.Round
- t.onlineCirculation = balances.OnlineMoney
- t.totalCirculation = balances.TotalMoney
-
- // Invalidate accounts if a round is missed (this also causes the accounts to be lazily initialized).
- if lastRound+1 != balances.Round {
- t.accounts = t.accounts[:0]
- return
- }
-
- // No transactions to update.
- if len(balances.Accounts) == 0 {
- return
- }
-
- // Lookup map for updated accounts.
- accountSet := make(map[basics.Address]bool)
-
- payset, err := b.DecodePaysetFlat()
- if err != nil {
- return
- }
-
- for _, txad := range payset {
- tx := txad.SignedTxn
- if tx.Txn.Type == protocol.PaymentTx {
- accountSet[tx.Txn.Receiver] = true
- if tx.Txn.CloseRemainderTo != (basics.Address{}) {
- accountSet[tx.Txn.CloseRemainderTo] = true
- }
- }
- accountSet[tx.Txn.Src()] = true
- }
-
- // TODO: This loop may not be needed with the ledger refactor.
- // Since the balance list currently is unrelated to the transaction list, must iterate balances.
- for _, tx := range balances.Accounts {
- accountSet[tx.Address] = true
- }
-
- // Remove any accounts in the updated accountSet (they'll be merged back if necessary)
- t.accounts = removeSome(t.accounts, func(addr basics.AccountDetail) bool { return accountSet[addr.Address] })
-
- // Grab the smallest record after removing modified accounts
- smallestAccountSize := basics.MicroAlgos{Raw: 0}
- if len(t.accounts) != 0 {
- smallestAccountSize = t.accounts[len(t.accounts)-1].Algos
- }
-
- t.accounts = updateTopAccounts(t.accounts, balances.Accounts)
-
- // Truncate any accounts after the smallest balance.
- // This triggers a full re-init if the length falls below 'numTopAccounts'
- for i, acct := range t.accounts {
- if acct.Algos.LessThan(smallestAccountSize) {
- t.accounts = t.accounts[:i]
- return
- }
- }
-}
-
-// Helper method to defragment a slice using a predicate to identify stale entries.
-func removeSome(slice []basics.AccountDetail, predicate func(basics.AccountDetail) bool) []basics.AccountDetail {
- // Remove updated accounts (they'll be merged back in as necessary)
- next, end := 0, 0
- for (next + end) < len(slice) {
- if predicate(slice[next+end]) {
- end++
- } else {
- slice[next] = slice[next+end]
- next++
- }
- }
-
- return slice[:next]
-}
-
-// Merge largest accounts from balances into topN, removing values from topN as necessary.
-// The underlying capacity will not be modified, but the length may increase.
-// Note: Doesn't check for duplicates.
-func updateTopAccounts(topN []basics.AccountDetail, balances []basics.AccountDetail) []basics.AccountDetail {
- for _, account := range balances {
- balance := account.Algos
-
- // Quick check for topN if capacity is reached.
- if account.Status != basics.Online || len(topN) != 0 && len(topN) == cap(topN) && balance.Raw <= topN[len(topN)-1].Algos.Raw {
- continue
- }
-
- // Find insertion point.
- pos := sort.Search(len(topN), func(i int) bool {
- return topN[i].Algos.LessThan(balance)
- })
-
- // Increase capacity if more space is available.
- if len(topN) < cap(topN) {
- topN = topN[:len(topN)+1]
- }
-
- // Shift upper elements and insert
- if pos < len(topN) {
- copy(topN[pos+1:], topN[pos:])
- topN[pos] = account
- }
- }
-
- return topN
-}
-
-// Compile current top account state into a telemetry event, and send it.
-func (t *topAccountListener) sendEvent() {
- // Build accounts object.
- payload := make([]map[string]interface{}, 0)
- fCirculation := float64(t.onlineCirculation.ToUint64())
- for _, account := range t.accounts[:] {
- entry := make(map[string]interface{})
- entry["address"] = account.Address.String()
- entry["balance"] = account.Algos.ToUint64()
- entry["stake"] = float64(account.Algos.ToUint64()) / fCirculation
- payload = append(payload, entry)
- }
-
- // Send it out
- t.log.EventWithDetails(telemetryspec.Accounts, telemetryspec.TopAccountsEvent,
- telemetryspec.TopAccountEventDetails{
- Round: uint64(t.round),
- OnlineAccounts: payload,
- OnlineCirculation: t.onlineCirculation.ToUint64(),
- OfflineCirculation: t.totalCirculation.ToUint64() - t.onlineCirculation.ToUint64(),
- })
-}
diff --git a/node/topAccountListener_test.go b/node/topAccountListener_test.go
deleted file mode 100644
index 166baab72b..0000000000
--- a/node/topAccountListener_test.go
+++ /dev/null
@@ -1,368 +0,0 @@
-// Copyright (C) 2019-2023 Algorand, Inc.
-// This file is part of go-algorand
-//
-// go-algorand is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Affero General Public License as
-// published by the Free Software Foundation, either version 3 of the
-// License, or (at your option) any later version.
-//
-// go-algorand is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Affero General Public License for more details.
-//
-// You should have received a copy of the GNU Affero General Public License
-// along with go-algorand. If not, see .
-
-package node
-
-import (
- "fmt"
- "testing"
-
- "github.com/algorand/go-algorand/crypto"
- "github.com/algorand/go-algorand/data/basics"
- "github.com/algorand/go-algorand/data/bookkeeping"
- "github.com/algorand/go-algorand/data/transactions"
- "github.com/algorand/go-algorand/logging"
- "github.com/algorand/go-algorand/protocol"
- "github.com/algorand/go-algorand/test/partitiontest"
-)
-
-// errorString is a trivial implementation of error.
-type errorString struct {
- s string
-}
-
-func (e *errorString) Error() string {
- return e.s
-}
-
-func TestUpdateTopAccounts(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- var topN []basics.AccountDetail
- var input []basics.AccountDetail
-
- // Empty target array.
- topN = []basics.AccountDetail{}
- input = []basics.AccountDetail{onlineDetail(byte(0), 1), onlineDetail(byte(1), 10)}
- topN = updateTopAccounts(topN, input)
-
- if len(topN) != 0 {
- t.Errorf("Target slice not 0: len(topN) == %d", len(topN))
- }
-
- // Extra space available
- topN = make([]basics.AccountDetail, 0, 20)
- input = []basics.AccountDetail{onlineDetail(byte(0), 1), onlineDetail(byte(1), 10)}
- topN = updateTopAccounts(topN, input)
-
- if err := verifyAccountBalances([]uint64{10, 1}, topN); err != nil {
- t.Error(err)
- }
-
- // Overflow, unmodified
- topN = make([]basics.AccountDetail, 0, 4)
- input = []basics.AccountDetail{
- onlineDetail(byte(0), 11),
- onlineDetail(byte(1), 12),
- onlineDetail(byte(2), 13),
- onlineDetail(byte(3), 14),
- onlineDetail(byte(4), 1),
- }
- topN = updateTopAccounts(topN, input)
-
- if err := verifyAccountBalances([]uint64{14, 13, 12, 11}, topN); err != nil {
- t.Error(err)
- }
-
- // Overflow, insert front
- topN = make([]basics.AccountDetail, 0, 4)
- input = []basics.AccountDetail{
- onlineDetail(byte(1), 11),
- onlineDetail(byte(2), 12),
- onlineDetail(byte(3), 13),
- onlineDetail(byte(4), 14),
- onlineDetail(byte(5), 15),
- }
- topN = updateTopAccounts(topN, input)
-
- if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil {
- t.Error(err)
- }
-
- // Overflow, insert middle
- topN = make([]basics.AccountDetail, 0, 4)
- input = []basics.AccountDetail{
- onlineDetail(byte(1), 11),
- onlineDetail(byte(2), 12),
- onlineDetail(byte(3), 13),
- onlineDetail(byte(4), 15),
- onlineDetail(byte(5), 14),
- }
- topN = updateTopAccounts(topN, input)
-
- if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil {
- t.Error(err)
- }
-
- // Overflow, insert end
- topN = make([]basics.AccountDetail, 0, 4)
- input = []basics.AccountDetail{
- onlineDetail(byte(1), 11),
- onlineDetail(byte(2), 13),
- onlineDetail(byte(3), 14),
- onlineDetail(byte(4), 15),
- onlineDetail(byte(5), 12),
- }
- topN = updateTopAccounts(topN, input)
-
- if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil {
- t.Error(err)
- }
-
- // Ignore offline account, shouldn't change topN
- topN = updateTopAccounts(topN, []basics.AccountDetail{detail(byte(6), 200, false)})
- topN = make([]basics.AccountDetail, 0, 4)
- input = []basics.AccountDetail{
- onlineDetail(byte(1), 12),
- onlineDetail(byte(2), 13),
- onlineDetail(byte(3), 14),
- onlineDetail(byte(4), 15),
- detail(byte(5), 200, false),
- }
- topN = updateTopAccounts(topN, input)
-
- if err := verifyAccountBalances([]uint64{15, 14, 13, 12}, topN); err != nil {
- t.Error(err)
- }
-}
-
-func TestRemoveSome(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- // Initialize slice with 100 accounts
- var accountsSlice []basics.AccountDetail
- for i := 0; i <= 100; i++ {
- accountsSlice = append(accountsSlice, onlineDetail(byte(i), 10))
- }
-
- // Remove accounts where the first byte is divisible by 10 (which includes the first and last index
- remove10s := func(details basics.AccountDetail) bool {
- return getInt(details)%10 == 0
- }
-
- accountsSlice = removeSome(accountsSlice, remove10s)
-
- if len(accountsSlice) != 90 {
- t.Errorf("Unexpected size found after removeSome/remove10s: 90 != %d", len(accountsSlice))
- }
- for _, d := range accountsSlice {
- if getInt(d)%10 == 0 {
- t.Errorf("Unexpected value found after removeSome/remove10s: %d", getInt(d))
- }
- }
-
- // Remove remaining accounts where the first byte is even
- removeEven := func(details basics.AccountDetail) bool {
- return getInt(details)%2 == 0
- }
-
- accountsSlice = removeSome(accountsSlice, removeEven)
-
- if len(accountsSlice) != 50 {
- t.Errorf("Unexpected size found after removeSome/removeEven: 50 != %d", len(accountsSlice))
- }
- for _, d := range accountsSlice {
- if getInt(d)%2 == 0 {
- t.Errorf("Unexpected value found after removeSome/removeEven: %d", getInt(d))
- }
- }
-}
-
-func TestUpdate(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- listener := topAccountListener{
- accounts: []basics.AccountDetail{},
- round: 1,
- totalCirculation: basics.MicroAlgos{Raw: 100},
- onlineCirculation: basics.MicroAlgos{Raw: 100},
- }
-
- balanceUpdate := basics.BalanceDetail{
- Accounts: []basics.AccountDetail{},
- Round: 2,
- OnlineMoney: basics.MicroAlgos{Raw: 100000},
- TotalMoney: basics.MicroAlgos{Raw: 1000000},
- }
-
- // Update when accounts is empty.
- listener.update(bookkeeping.Block{}, balanceUpdate)
- if err := verifyListener(listener, []uint64{}, 100000, 1000000, 2); err != nil {
- t.Error(err)
- }
-
- // Transactions causing acct 1 to increase reorders the TopN.
- listener.accounts = []basics.AccountDetail{
- onlineDetail(byte(0), 15),
- onlineDetail(byte(1), 10),
- onlineDetail(byte(2), 5),
- }
- balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(1), 100)}
- balanceUpdate.Round++
- block := makeBlockWithTxnFor([]byte{3}, []byte{1})
-
- listener.update(block, balanceUpdate)
-
- // 10 -> 100.
- if err := verifyListener(listener, []uint64{100, 15, 5}, 100000, 1000000, 3); err != nil {
- t.Error(err)
- }
-
- // Transactions causing acct 1 to decrease and falls off topN truncates result.
- listener.accounts = []basics.AccountDetail{
- onlineDetail(byte(0), 15),
- onlineDetail(byte(1), 10),
- onlineDetail(byte(2), 5),
- }
- balanceUpdate.Round++
- balanceUpdate.TotalMoney = basics.MicroAlgos{Raw: 99999999}
- balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(1), 1)}
- block = makeBlockWithTxnFor([]byte{3}, []byte{1})
- listener.update(block, balanceUpdate)
-
- if err := verifyListener(listener, []uint64{15, 5}, 100000, 99999999, 4); err != nil {
- t.Error(err)
- }
-
- // Transactions causing adding a balance to a new account are not reflected in TopN, because they are smaller than
- // the smallest value in TopN (even though there is capacity for it).
- listener.accounts = make([]basics.AccountDetail, 3, 10)
- listener.accounts[0] = onlineDetail(byte(0), 15)
- listener.accounts[1] = onlineDetail(byte(1), 10)
- listener.accounts[2] = onlineDetail(byte(2), 5)
-
- balanceUpdate.Round++
- balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(3), 1)}
- block = makeBlockWithTxnFor([]byte{5}, []byte{3})
- listener.update(block, balanceUpdate)
-
- if err := verifyListener(listener, []uint64{15, 10, 5}, 100000, 99999999, 5); err != nil {
- t.Error(err)
- }
-
- // Invalid round truncates accounts slice
- listener.update(block, balanceUpdate)
- if len(listener.accounts) != 0 {
- t.Errorf("Accounts should be truncated to zero after unexpected round: len(topN) = %d", len(listener.accounts))
- }
-}
-
-func TestInit(t *testing.T) {
- partitiontest.PartitionTest(t)
-
- listener := makeTopAccountListener(logging.Base())
-
- // "init" should remove existing values before adding new ones.
- balanceUpdate := basics.BalanceDetail{
- Accounts: make([]basics.AccountDetail, 0, 10),
- Round: 2,
- OnlineMoney: basics.MicroAlgos{Raw: 100},
- TotalMoney: basics.MicroAlgos{Raw: 100},
- }
-
- listener.accounts = append(listener.accounts, onlineDetail(byte(10), 100))
- balanceUpdate.Accounts = []basics.AccountDetail{onlineDetail(byte(1), 1)}
-
- listener.init(balanceUpdate)
-
- if err := verifyListener(listener, []uint64{1}, 100, 100, 2); err != nil {
- t.Error(err)
- }
-}
-
-func makeBlockWithTxnFor(senders []byte, receivers []byte) bookkeeping.Block {
- var blk bookkeeping.Block
- blk.BlockHeader.GenesisID = "foo"
- crypto.RandBytes(blk.BlockHeader.GenesisHash[:])
- blk.CurrentProtocol = protocol.ConsensusFuture
-
- paysets := make([]transactions.SignedTxnInBlock, 0, len(receivers))
- for i, b := range receivers {
- txib, err := blk.EncodeSignedTxn(transactions.SignedTxn{
- Txn: transactions.Transaction{
- Type: protocol.PaymentTx,
- Header: transactions.Header{
- Sender: basics.Address{senders[i]},
- GenesisID: blk.BlockHeader.GenesisID,
- GenesisHash: blk.BlockHeader.GenesisHash,
- },
- PaymentTxnFields: transactions.PaymentTxnFields{
- Receiver: basics.Address{b},
- // If this ends up being used by topAccountListener, add it here.
- // Amount: basics.MicroAlgos{123},
- },
- }}, transactions.ApplyData{})
- if err != nil {
- panic(err)
- }
-
- paysets = append(paysets, txib)
- }
-
- blk.Payset = paysets
- return blk
-}
-
-// Helpers for working with data objects.
-func onlineDetail(b byte, bal uint64) basics.AccountDetail {
- return detail(b, bal, true)
-}
-
-func detail(b byte, bal uint64, isOnline bool) basics.AccountDetail {
- state := basics.Offline
- if isOnline {
- state = basics.Online
- }
- return basics.AccountDetail{
- Address: basics.Address{b},
- Algos: basics.MicroAlgos{Raw: bal},
- Status: state,
- }
-}
-
-func getInt(detail basics.AccountDetail) uint64 {
- return uint64([32]byte(detail.Address)[0])
-}
-
-func verifyAccountBalances(expected []uint64, actual []basics.AccountDetail) error {
- if len(expected) != len(actual) {
- return &errorString{fmt.Sprintf("Lengths do not equal: expected(%d) != actual(%d)", len(expected), len(actual))}
- }
-
- for i, a := range actual {
- if expected[i] != a.Algos.Raw {
- return &errorString{fmt.Sprintf("Unexpected result at actual[%d]: expected(%d) != actual(%d)", i, expected[i], a.Algos.Raw)}
- }
- }
-
- return nil
-}
-
-func verifyListener(listener topAccountListener, expected []uint64, online uint64, total uint64, round uint64) error {
- if listener.round != basics.Round(round) {
- return &errorString{fmt.Sprintf("Unexpected round: actual(%d) != expected(%d)", uint64(listener.round), round)}
- }
-
- if listener.onlineCirculation.Raw != online {
- return &errorString{fmt.Sprintf("Unexpected online circulation: actual(%d) != expected(%d)", listener.onlineCirculation.Raw, online)}
- }
-
- if listener.totalCirculation.Raw != total {
- return &errorString{fmt.Sprintf("Unexpected total circulation: actual(%d) != expected(%d)", listener.totalCirculation.Raw, total)}
- }
-
- return verifyAccountBalances(expected, listener.accounts)
-}