Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ledger/acctdeltas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
normalizedAccountBalances, err := prepareNormalizedBalancesV6(chunk.Balances, proto)
require.NoError(b, err)
b.StartTimer()
err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = l.trackerDBs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
crw := store.NewCatchpointSQLReaderWriter(tx)
err = crw.WriteCatchpointStagingBalances(ctx, normalizedAccountBalances)
return
Expand All @@ -937,7 +937,7 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo
last64KDuration := time.Since(last64KStart) - last64KAccountCreationTime
fmt.Printf("%-82s%-7d (last 64k) %-6d ns/account %d accounts/sec\n", b.Name(), last64KSize, (last64KDuration / time.Duration(last64KSize)).Nanoseconds(), int(float64(last64KSize)/float64(last64KDuration.Seconds())))
}
stats, err := l.trackerDBs.Wdb.Vacuum(context.Background())
stats, err := l.trackerDBs.Vacuum(context.Background())
require.NoError(b, err)
fmt.Printf("%-82sdb fragmentation %.1f%%\n", b.Name(), float32(stats.PagesBefore-stats.PagesAfter)*100/float32(stats.PagesBefore))
b.ReportMetric(float64(b.N)/float64((time.Since(accountsWritingStarted)-accountsGenerationDuration).Seconds()), "accounts/sec")
Expand Down
8 changes: 4 additions & 4 deletions ledger/acctonline.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type cachedOnlineAccount struct {
// onlineAccounts tracks history of online accounts
type onlineAccounts struct {
// Connection to the database.
dbs db.Pair
dbs store.TrackerStore

// Prepared SQL statements for fast accounts DB lookups.
accountsq store.OnlineAccountsReader
Expand Down Expand Up @@ -151,7 +151,7 @@ func (ao *onlineAccounts) initializeFromDisk(l ledgerForTracker, lastBalancesRou
ao.dbs = l.trackerDB()
ao.log = l.trackerLog()

err = ao.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err = ao.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) error {
arw := store.NewAccountsSQLReaderWriter(tx)
var err0 error
var endRound basics.Round
Expand All @@ -175,7 +175,7 @@ func (ao *onlineAccounts) initializeFromDisk(l ledgerForTracker, lastBalancesRou
return
}

ao.accountsq, err = store.OnlineAccountsInitDbQueries(ao.dbs.Rdb.Handle)
ao.accountsq, err = ao.dbs.CreateOnlineAccountsReader()
if err != nil {
return
}
Expand Down Expand Up @@ -815,7 +815,7 @@ func (ao *onlineAccounts) TopOnlineAccounts(rnd basics.Round, voteRnd basics.Rou
var accts map[basics.Address]*ledgercore.OnlineAccount
start := time.Now()
ledgerAccountsonlinetopCount.Inc(nil)
err = ao.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = ao.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
accts, err = arw.AccountsOnlineTop(rnd, batchOffset, batchSize, genesisProto)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions ledger/acctonline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func commitSyncPartial(t *testing.T, oa *onlineAccounts, ml *mockLedgerForTracke
err := lt.prepareCommit(dcc)
require.NoError(t, err)
}
err := ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err := ml.trackers.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
for _, lt := range ml.trackers.trackers {
err0 := lt.commitRound(ctx, tx, dcc)
Expand Down Expand Up @@ -807,7 +807,7 @@ func TestAcctOnlineRoundParamsCache(t *testing.T) {

var dbOnlineRoundParams []ledgercore.OnlineRoundParamsData
var endRound basics.Round
err := ao.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err := ao.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
dbOnlineRoundParams, endRound, err = arw.AccountsOnlineRoundParams()
return err
Expand Down Expand Up @@ -1292,7 +1292,7 @@ func TestAcctOnlineVotersLongerHistory(t *testing.T) {
// DB has all the required history tho
var dbOnlineRoundParams []ledgercore.OnlineRoundParamsData
var endRound basics.Round
err = oa.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = oa.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snapshot? AccountsOnlineRoundParams is part of accountsV2Reader

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will sort out snapshot, batch, transaction on the PR I'm work on on right now based on what methods they end up calling.

The PR I'm working on gets rid of that tx *sql.Tx in the callbacks in favor of using interfaces with the methods we have defined already.

arw := store.NewAccountsSQLReaderWriter(tx)
dbOnlineRoundParams, endRound, err = arw.AccountsOnlineRoundParams()
return err
Expand Down Expand Up @@ -1680,7 +1680,7 @@ func TestAcctOnlineTopDBBehindMemRound(t *testing.T) {
go func() {
time.Sleep(2 * time.Second)
// tweak the database to move backwards
err = oa.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = oa.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
_, err = tx.Exec("update acctrounds set rnd = 1 WHERE id='acctbase' ")
return
})
Expand Down
10 changes: 5 additions & 5 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ type modifiedKvValue struct {

type accountUpdates struct {
// Connection to the database.
dbs db.Pair
dbs store.TrackerStore

// Prepared SQL statements for fast accounts DB lookups.
// Optimized reader for fast accounts DB lookups.
accountsq store.AccountsReader

// cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()),
Expand Down Expand Up @@ -928,7 +928,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou

start := time.Now()
ledgerAccountsinitCount.Inc(nil)
err = au.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err = au.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) error {
arw := store.NewAccountsSQLReaderWriter(tx)
totals, err0 := arw.AccountsTotals(ctx, false)
if err0 != nil {
Expand All @@ -944,7 +944,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou
return
}

au.accountsq, err = store.AccountsInitDbQueries(au.dbs.Rdb.Handle)
au.accountsq, err = au.dbs.CreateAccountsReader()
if err != nil {
return
}
Expand Down Expand Up @@ -1962,7 +1962,7 @@ func (au *accountUpdates) vacuumDatabase(ctx context.Context) (err error) {
}()

ledgerVacuumCount.Inc(nil)
vacuumStats, err := au.dbs.Wdb.Vacuum(ctx)
vacuumStats, err := au.dbs.Vacuum(ctx)
close(vacuumExitCh)
vacuumLoggingAbort.Wait()

Expand Down
27 changes: 13 additions & 14 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff
var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36}

type mockLedgerForTracker struct {
dbs db.Pair
dbs store.TrackerStore
blocks []blockEntry
deltas []ledgercore.StateDelta
log logging.Logger
Expand Down Expand Up @@ -94,9 +94,8 @@ func setupAccts(niter int) []map[basics.Address]basics.AccountData {
}

func makeMockLedgerForTrackerWithLogger(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, l logging.Logger) *mockLedgerForTracker {
dbs, fileName := storetesting.DbOpenTest(t, inMemory)
dbs.Rdb.SetLogger(l)
dbs.Wdb.SetLogger(l)
dbs, fileName := store.DbOpenTrackerTest(t, inMemory)
dbs.SetLogger(l)

blocks := randomInitChain(consensusVersion, initialBlocksCount)
deltas := make([]ledgercore.StateDelta, initialBlocksCount)
Expand Down Expand Up @@ -154,7 +153,7 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker {
copy(newLedgerTracker.deltas, ml.deltas)

// calling Vacuum implies flushing the database content to disk..
ml.dbs.Wdb.Vacuum(context.Background())
ml.dbs.Vacuum(context.Background())
// copy the database files.
for _, ext := range []string{"", "-shm", "-wal"} {
bytes, err := os.ReadFile(ml.filename + ext)
Expand All @@ -167,7 +166,7 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker {
dbs.Rdb.SetLogger(dblogger)
dbs.Wdb.SetLogger(dblogger)

newLedgerTracker.dbs = dbs
newLedgerTracker.dbs = store.CreateTrackerSQLStore(dbs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think go prefers Make/New prefix instead of Create for such functions

return newLedgerTracker
}

Expand Down Expand Up @@ -220,7 +219,7 @@ func (ml *mockLedgerForTracker) BlockHdr(rnd basics.Round) (bookkeeping.BlockHea
return ml.blocks[int(rnd)].block.BlockHeader, nil
}

func (ml *mockLedgerForTracker) trackerDB() db.Pair {
func (ml *mockLedgerForTracker) trackerDB() store.TrackerStore {
return ml.dbs
}

Expand Down Expand Up @@ -264,7 +263,7 @@ func (au *accountUpdates) allBalances(rnd basics.Round) (bals map[basics.Address
return
}

err = au.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err = au.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) error {
var err0 error
bals, err0 = accountsAll(tx)
return err0
Expand Down Expand Up @@ -572,7 +571,7 @@ func TestAcctUpdates(t *testing.T) {

// check the account totals.
var dbRound basics.Round
err := ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err := ml.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
dbRound, err = arw.AccountsRound()
return
Expand All @@ -586,7 +585,7 @@ func TestAcctUpdates(t *testing.T) {

expectedTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardsLevels[dbRound], proto, nil, ledgercore.AccountTotals{})
var actualTotals ledgercore.AccountTotals
err = ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = ml.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
actualTotals, err = arw.AccountsTotals(ctx, false)
return
Expand Down Expand Up @@ -1578,14 +1577,14 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) {
i++
}

err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err := ml.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
_, _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1))
return
})
require.NoError(b, err)
}

err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err := ml.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
return arw.UpdateAccountsHashRound(ctx, 1)
})
Expand Down Expand Up @@ -2352,7 +2351,7 @@ func TestAcctUpdatesResources(t *testing.T) {

err := au.prepareCommit(dcc)
require.NoError(t, err)
err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = ml.trackers.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
err = au.commitRound(ctx, tx, dcc)
if err != nil {
Expand Down Expand Up @@ -2636,7 +2635,7 @@ func auCommitSync(t *testing.T, rnd basics.Round, au *accountUpdates, ml *mockLe

err := au.prepareCommit(dcc)
require.NoError(t, err)
err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = ml.trackers.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
arw := store.NewAccountsSQLReaderWriter(tx)
err = au.commitRound(ctx, tx, dcc)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion ledger/archival_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger/internal"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/ledger/store"
"github.com/algorand/go-algorand/ledger/store/blockdb"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (wl *wrappedLedger) Latest() basics.Round {
return wl.l.Latest()
}

func (wl *wrappedLedger) trackerDB() db.Pair {
func (wl *wrappedLedger) trackerDB() store.TrackerStore {
return wl.l.trackerDB()
}

Expand Down
50 changes: 25 additions & 25 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ func catchpointStage1Decoder(r io.Reader) (io.ReadCloser, error) {
return snappyReadCloser{snappy.NewReader(r)}, nil
}

type catchpointStore interface {
store.CatchpointWriter
store.CatchpointReader
}

type catchpointTracker struct {
// dbDirectory is the directory where the ledger and block sql file resides as well as the parent directory for the catchup files to be generated
dbDirectory string
Expand All @@ -103,8 +98,8 @@ type catchpointTracker struct {
log logging.Logger

// Connection to the database.
dbs db.Pair
catchpointStore catchpointStore
dbs store.TrackerStore
catchpointStore store.CatchpointReaderWriter

// The last catchpoint label that was written to the database. Should always align with what's in the database.
// note that this is the last catchpoint *label* and not the catchpoint file.
Expand Down Expand Up @@ -216,7 +211,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
}
}

f := func(ctx context.Context, tx *sql.Tx) error {
return ct.dbs.Batch(func(ctx context.Context, tx *sql.Tx) error {
crw := store.NewCatchpointSQLReaderWriter(tx)
err := ct.recordFirstStageInfo(ctx, tx, dbRound, totalKVs, totalAccounts, totalChunks, biggestChunkLen)
if err != nil {
Expand All @@ -225,8 +220,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic

// Clear the db record.
return crw.WriteCatchpointStateUint64(ctx, store.CatchpointStateWritingFirstStageInfo, 0)
}
return ct.dbs.Wdb.Atomic(f)
})
}

// Possibly finish generating first stage catchpoint db record and data file after
Expand Down Expand Up @@ -319,22 +313,25 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error {
func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Round) (err error) {
ct.log = l.trackerLog()
ct.dbs = l.trackerDB()
ct.catchpointStore = store.NewCatchpointSQLReaderWriter(l.trackerDB().Wdb.Handle)
ct.catchpointStore, err = l.trackerDB().CreateCatchpointReaderWriter()
if err != nil {
return err
}

ct.roundDigest = nil
ct.catchpointDataWriting = 0
// keep these channel closed if we're not generating catchpoint
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
close(ct.catchpointDataSlowWriting)

err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error {
err = ct.dbs.Batch(func(ctx context.Context, tx *sql.Tx) error {
return ct.initializeHashes(ctx, tx, dbRound)
})
if err != nil {
return err
}

ct.accountsq, err = store.AccountsInitDbQueries(ct.dbs.Rdb.Handle)
ct.accountsq, err = ct.dbs.CreateAccountsReader()
if err != nil {
return
}
Expand Down Expand Up @@ -777,9 +774,9 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound
return err
}

err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = ct.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) {
crw := store.NewCatchpointSQLReaderWriter(tx)
err = ct.recordCatchpointFile(ctx, tx, round, relCatchpointFilePath, fileInfo.Size())
err = ct.recordCatchpointFile(ctx, crw, round, relCatchpointFilePath, fileInfo.Size())
if err != nil {
return err
}
Expand Down Expand Up @@ -1090,7 +1087,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
var catchpointWriter *catchpointWriter
start := time.Now()
ledgerGeneratecatchpointCount.Inc(nil)
err = ct.dbs.Rdb.AtomicContext(ctx, func(dbCtx context.Context, tx *sql.Tx) (err error) {
err = ct.dbs.BatchContext(ctx, func(dbCtx context.Context, tx *sql.Tx) (err error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in #5583

catchpointWriter, err = makeCatchpointWriter(dbCtx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk)
if err != nil {
return
Expand Down Expand Up @@ -1213,8 +1210,7 @@ func makeCatchpointDataFilePath(accountsRound basics.Round) string {
// after a successful insert operation to the database, it would delete up to 2 old entries, as needed.
// deleting 2 entries while inserting single entry allow us to adjust the size of the backing storage and have the
// database and storage realign.
func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Executable, round basics.Round, relCatchpointFilePath string, fileSize int64) (err error) {
crw := store.NewCatchpointSQLReaderWriter(e)
func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, crw store.CatchpointReaderWriter, round basics.Round, relCatchpointFilePath string, fileSize int64) (err error) {
if ct.catchpointFileHistoryLength != 0 {
err = crw.StoreCatchpoint(ctx, round, relCatchpointFilePath, "", fileSize)
if err != nil {
Expand Down Expand Up @@ -1257,7 +1253,7 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS
ledgerGetcatchpointCount.Inc(nil)
// TODO: we need to generalize this, check @cce PoC PR, he has something
// somewhat broken for some KVs..
err := ct.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err := ct.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) {
crw := store.NewCatchpointSQLReaderWriter(tx)
dbFileName, _, fileSize, err = crw.GetCatchpoint(ctx, round)
return
Expand All @@ -1277,8 +1273,11 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS
if os.IsNotExist(err) {
// the database told us that we have this file.. but we couldn't find it.
// delete it from the database.
err := ct.recordCatchpointFile(
context.Background(), ct.dbs.Wdb.Handle, round, "", 0)
crw, err := ct.dbs.CreateCatchpointReaderWriter()
if err != nil {
return nil, err
}
err = ct.recordCatchpointFile(context.Background(), crw, round, "", 0)
if err != nil {
ct.log.Warnf("catchpointTracker.GetCatchpointStream() unable to delete missing catchpoint entry: %v", err)
return nil, err
Expand All @@ -1302,10 +1301,11 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS
// we couldn't get the stat, so just return with the file.
return &readCloseSizer{ReadCloser: file, size: -1}, nil
}

err = ct.recordCatchpointFile(
context.Background(), ct.dbs.Wdb.Handle, round, relCatchpointFilePath,
fileInfo.Size())
crw, err := ct.dbs.CreateCatchpointReaderWriter()
if err != nil {
return nil, err
}
err = ct.recordCatchpointFile(context.Background(), crw, round, relCatchpointFilePath, fileInfo.Size())
if err != nil {
ct.log.Warnf("catchpointTracker.GetCatchpointStream() unable to save missing catchpoint entry: %v", err)
}
Expand Down
Loading