diff --git a/ledger/acctdeltas_test.go b/ledger/acctdeltas_test.go index 21ac1e5dcc..017677a96c 100644 --- a/ledger/acctdeltas_test.go +++ b/ledger/acctdeltas_test.go @@ -924,7 +924,7 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo normalizedAccountBalances, err := prepareNormalizedBalancesV6(chunk.Balances, proto) require.NoError(b, err) b.StartTimer() - err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = l.trackerDBs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) err = crw.WriteCatchpointStagingBalances(ctx, normalizedAccountBalances) return @@ -937,7 +937,7 @@ func benchmarkWriteCatchpointStagingBalancesSub(b *testing.B, ascendingOrder boo last64KDuration := time.Since(last64KStart) - last64KAccountCreationTime fmt.Printf("%-82s%-7d (last 64k) %-6d ns/account %d accounts/sec\n", b.Name(), last64KSize, (last64KDuration / time.Duration(last64KSize)).Nanoseconds(), int(float64(last64KSize)/float64(last64KDuration.Seconds()))) } - stats, err := l.trackerDBs.Wdb.Vacuum(context.Background()) + stats, err := l.trackerDBs.Vacuum(context.Background()) require.NoError(b, err) fmt.Printf("%-82sdb fragmentation %.1f%%\n", b.Name(), float32(stats.PagesBefore-stats.PagesAfter)*100/float32(stats.PagesBefore)) b.ReportMetric(float64(b.N)/float64((time.Since(accountsWritingStarted)-accountsGenerationDuration).Seconds()), "accounts/sec") diff --git a/ledger/acctonline.go b/ledger/acctonline.go index f73b153e24..2cacc0efe7 100644 --- a/ledger/acctonline.go +++ b/ledger/acctonline.go @@ -62,7 +62,7 @@ type cachedOnlineAccount struct { // onlineAccounts tracks history of online accounts type onlineAccounts struct { // Connection to the database. - dbs db.Pair + dbs store.TrackerStore // Prepared SQL statements for fast accounts DB lookups. accountsq store.OnlineAccountsReader @@ -151,7 +151,7 @@ func (ao *onlineAccounts) initializeFromDisk(l ledgerForTracker, lastBalancesRou ao.dbs = l.trackerDB() ao.log = l.trackerLog() - err = ao.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = ao.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) error { arw := store.NewAccountsSQLReaderWriter(tx) var err0 error var endRound basics.Round @@ -175,7 +175,7 @@ func (ao *onlineAccounts) initializeFromDisk(l ledgerForTracker, lastBalancesRou return } - ao.accountsq, err = store.OnlineAccountsInitDbQueries(ao.dbs.Rdb.Handle) + ao.accountsq, err = ao.dbs.CreateOnlineAccountsReader() if err != nil { return } @@ -815,7 +815,7 @@ func (ao *onlineAccounts) TopOnlineAccounts(rnd basics.Round, voteRnd basics.Rou var accts map[basics.Address]*ledgercore.OnlineAccount start := time.Now() ledgerAccountsonlinetopCount.Inc(nil) - err = ao.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ao.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) accts, err = arw.AccountsOnlineTop(rnd, batchOffset, batchSize, genesisProto) if err != nil { diff --git a/ledger/acctonline_test.go b/ledger/acctonline_test.go index d892c21b67..050785e10a 100644 --- a/ledger/acctonline_test.go +++ b/ledger/acctonline_test.go @@ -81,7 +81,7 @@ func commitSyncPartial(t *testing.T, oa *onlineAccounts, ml *mockLedgerForTracke err := lt.prepareCommit(dcc) require.NoError(t, err) } - err := ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := ml.trackers.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) for _, lt := range ml.trackers.trackers { err0 := lt.commitRound(ctx, tx, dcc) @@ -807,7 +807,7 @@ func TestAcctOnlineRoundParamsCache(t *testing.T) { var dbOnlineRoundParams []ledgercore.OnlineRoundParamsData var endRound basics.Round - err := ao.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := ao.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) dbOnlineRoundParams, endRound, err = arw.AccountsOnlineRoundParams() return err @@ -1292,7 +1292,7 @@ func TestAcctOnlineVotersLongerHistory(t *testing.T) { // DB has all the required history tho var dbOnlineRoundParams []ledgercore.OnlineRoundParamsData var endRound basics.Round - err = oa.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = oa.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) dbOnlineRoundParams, endRound, err = arw.AccountsOnlineRoundParams() return err @@ -1680,7 +1680,7 @@ func TestAcctOnlineTopDBBehindMemRound(t *testing.T) { go func() { time.Sleep(2 * time.Second) // tweak the database to move backwards - err = oa.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = oa.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { _, err = tx.Exec("update acctrounds set rnd = 1 WHERE id='acctbase' ") return }) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index dbaaefca69..dc92684ea0 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -154,9 +154,9 @@ type modifiedKvValue struct { type accountUpdates struct { // Connection to the database. - dbs db.Pair + dbs store.TrackerStore - // Prepared SQL statements for fast accounts DB lookups. + // Optimized reader for fast accounts DB lookups. accountsq store.AccountsReader // cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()), @@ -928,7 +928,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou start := time.Now() ledgerAccountsinitCount.Inc(nil) - err = au.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = au.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) error { arw := store.NewAccountsSQLReaderWriter(tx) totals, err0 := arw.AccountsTotals(ctx, false) if err0 != nil { @@ -944,7 +944,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou return } - au.accountsq, err = store.AccountsInitDbQueries(au.dbs.Rdb.Handle) + au.accountsq, err = au.dbs.CreateAccountsReader() if err != nil { return } @@ -1962,7 +1962,7 @@ func (au *accountUpdates) vacuumDatabase(ctx context.Context) (err error) { }() ledgerVacuumCount.Inc(nil) - vacuumStats, err := au.dbs.Wdb.Vacuum(ctx) + vacuumStats, err := au.dbs.Vacuum(ctx) close(vacuumExitCh) vacuumLoggingAbort.Wait() diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 723e2c1986..69e4506517 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -51,7 +51,7 @@ var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36} type mockLedgerForTracker struct { - dbs db.Pair + dbs store.TrackerStore blocks []blockEntry deltas []ledgercore.StateDelta log logging.Logger @@ -94,9 +94,8 @@ func setupAccts(niter int) []map[basics.Address]basics.AccountData { } func makeMockLedgerForTrackerWithLogger(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, l logging.Logger) *mockLedgerForTracker { - dbs, fileName := storetesting.DbOpenTest(t, inMemory) - dbs.Rdb.SetLogger(l) - dbs.Wdb.SetLogger(l) + dbs, fileName := store.DbOpenTrackerTest(t, inMemory) + dbs.SetLogger(l) blocks := randomInitChain(consensusVersion, initialBlocksCount) deltas := make([]ledgercore.StateDelta, initialBlocksCount) @@ -154,7 +153,7 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker { copy(newLedgerTracker.deltas, ml.deltas) // calling Vacuum implies flushing the database content to disk.. - ml.dbs.Wdb.Vacuum(context.Background()) + ml.dbs.Vacuum(context.Background()) // copy the database files. for _, ext := range []string{"", "-shm", "-wal"} { bytes, err := os.ReadFile(ml.filename + ext) @@ -167,7 +166,7 @@ func (ml *mockLedgerForTracker) fork(t testing.TB) *mockLedgerForTracker { dbs.Rdb.SetLogger(dblogger) dbs.Wdb.SetLogger(dblogger) - newLedgerTracker.dbs = dbs + newLedgerTracker.dbs = store.CreateTrackerSQLStore(dbs) return newLedgerTracker } @@ -220,7 +219,7 @@ func (ml *mockLedgerForTracker) BlockHdr(rnd basics.Round) (bookkeeping.BlockHea return ml.blocks[int(rnd)].block.BlockHeader, nil } -func (ml *mockLedgerForTracker) trackerDB() db.Pair { +func (ml *mockLedgerForTracker) trackerDB() store.TrackerStore { return ml.dbs } @@ -264,7 +263,7 @@ func (au *accountUpdates) allBalances(rnd basics.Round) (bals map[basics.Address return } - err = au.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = au.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) error { var err0 error bals, err0 = accountsAll(tx) return err0 @@ -572,7 +571,7 @@ func TestAcctUpdates(t *testing.T) { // check the account totals. var dbRound basics.Round - err := ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := ml.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) dbRound, err = arw.AccountsRound() return @@ -586,7 +585,7 @@ func TestAcctUpdates(t *testing.T) { expectedTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardsLevels[dbRound], proto, nil, ledgercore.AccountTotals{}) var actualTotals ledgercore.AccountTotals - err = ml.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) actualTotals, err = arw.AccountsTotals(ctx, false) return @@ -1578,14 +1577,14 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) { i++ } - err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := ml.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { _, _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1)) return }) require.NoError(b, err) } - err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := ml.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) return arw.UpdateAccountsHashRound(ctx, 1) }) @@ -2352,7 +2351,7 @@ func TestAcctUpdatesResources(t *testing.T) { err := au.prepareCommit(dcc) require.NoError(t, err) - err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.trackers.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) err = au.commitRound(ctx, tx, dcc) if err != nil { @@ -2636,7 +2635,7 @@ func auCommitSync(t *testing.T, rnd basics.Round, au *accountUpdates, ml *mockLe err := au.prepareCommit(dcc) require.NoError(t, err) - err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.trackers.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) err = au.commitRound(ctx, tx, dcc) if err != nil { diff --git a/ledger/archival_test.go b/ledger/archival_test.go index c7f00b7d6a..cf96cb05c3 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -39,6 +39,7 @@ import ( "github.com/algorand/go-algorand/data/transactions/logic" "github.com/algorand/go-algorand/ledger/internal" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store" "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" @@ -75,7 +76,7 @@ func (wl *wrappedLedger) Latest() basics.Round { return wl.l.Latest() } -func (wl *wrappedLedger) trackerDB() db.Pair { +func (wl *wrappedLedger) trackerDB() store.TrackerStore { return wl.l.trackerDB() } diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 6968f580a0..220c6af6be 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -77,11 +77,6 @@ func catchpointStage1Decoder(r io.Reader) (io.ReadCloser, error) { return snappyReadCloser{snappy.NewReader(r)}, nil } -type catchpointStore interface { - store.CatchpointWriter - store.CatchpointReader -} - type catchpointTracker struct { // dbDirectory is the directory where the ledger and block sql file resides as well as the parent directory for the catchup files to be generated dbDirectory string @@ -103,8 +98,8 @@ type catchpointTracker struct { log logging.Logger // Connection to the database. - dbs db.Pair - catchpointStore catchpointStore + dbs store.TrackerStore + catchpointStore store.CatchpointReaderWriter // The last catchpoint label that was written to the database. Should always align with what's in the database. // note that this is the last catchpoint *label* and not the catchpoint file. @@ -216,7 +211,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic } } - f := func(ctx context.Context, tx *sql.Tx) error { + return ct.dbs.Batch(func(ctx context.Context, tx *sql.Tx) error { crw := store.NewCatchpointSQLReaderWriter(tx) err := ct.recordFirstStageInfo(ctx, tx, dbRound, totalKVs, totalAccounts, totalChunks, biggestChunkLen) if err != nil { @@ -225,8 +220,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic // Clear the db record. return crw.WriteCatchpointStateUint64(ctx, store.CatchpointStateWritingFirstStageInfo, 0) - } - return ct.dbs.Wdb.Atomic(f) + }) } // Possibly finish generating first stage catchpoint db record and data file after @@ -319,7 +313,10 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error { func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Round) (err error) { ct.log = l.trackerLog() ct.dbs = l.trackerDB() - ct.catchpointStore = store.NewCatchpointSQLReaderWriter(l.trackerDB().Wdb.Handle) + ct.catchpointStore, err = l.trackerDB().CreateCatchpointReaderWriter() + if err != nil { + return err + } ct.roundDigest = nil ct.catchpointDataWriting = 0 @@ -327,14 +324,14 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou ct.catchpointDataSlowWriting = make(chan struct{}, 1) close(ct.catchpointDataSlowWriting) - err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = ct.dbs.Batch(func(ctx context.Context, tx *sql.Tx) error { return ct.initializeHashes(ctx, tx, dbRound) }) if err != nil { return err } - ct.accountsq, err = store.AccountsInitDbQueries(ct.dbs.Rdb.Handle) + ct.accountsq, err = ct.dbs.CreateAccountsReader() if err != nil { return } @@ -777,9 +774,9 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound return err } - err = ct.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ct.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) - err = ct.recordCatchpointFile(ctx, tx, round, relCatchpointFilePath, fileInfo.Size()) + err = ct.recordCatchpointFile(ctx, crw, round, relCatchpointFilePath, fileInfo.Size()) if err != nil { return err } @@ -1090,7 +1087,7 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account var catchpointWriter *catchpointWriter start := time.Now() ledgerGeneratecatchpointCount.Inc(nil) - err = ct.dbs.Rdb.AtomicContext(ctx, func(dbCtx context.Context, tx *sql.Tx) (err error) { + err = ct.dbs.BatchContext(ctx, func(dbCtx context.Context, tx *sql.Tx) (err error) { catchpointWriter, err = makeCatchpointWriter(dbCtx, catchpointDataFilePath, tx, ResourcesPerCatchpointFileChunk) if err != nil { return @@ -1213,8 +1210,7 @@ func makeCatchpointDataFilePath(accountsRound basics.Round) string { // after a successful insert operation to the database, it would delete up to 2 old entries, as needed. // deleting 2 entries while inserting single entry allow us to adjust the size of the backing storage and have the // database and storage realign. -func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, e db.Executable, round basics.Round, relCatchpointFilePath string, fileSize int64) (err error) { - crw := store.NewCatchpointSQLReaderWriter(e) +func (ct *catchpointTracker) recordCatchpointFile(ctx context.Context, crw store.CatchpointReaderWriter, round basics.Round, relCatchpointFilePath string, fileSize int64) (err error) { if ct.catchpointFileHistoryLength != 0 { err = crw.StoreCatchpoint(ctx, round, relCatchpointFilePath, "", fileSize) if err != nil { @@ -1257,7 +1253,7 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS ledgerGetcatchpointCount.Inc(nil) // TODO: we need to generalize this, check @cce PoC PR, he has something // somewhat broken for some KVs.. - err := ct.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := ct.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) dbFileName, _, fileSize, err = crw.GetCatchpoint(ctx, round) return @@ -1277,8 +1273,11 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS if os.IsNotExist(err) { // the database told us that we have this file.. but we couldn't find it. // delete it from the database. - err := ct.recordCatchpointFile( - context.Background(), ct.dbs.Wdb.Handle, round, "", 0) + crw, err := ct.dbs.CreateCatchpointReaderWriter() + if err != nil { + return nil, err + } + err = ct.recordCatchpointFile(context.Background(), crw, round, "", 0) if err != nil { ct.log.Warnf("catchpointTracker.GetCatchpointStream() unable to delete missing catchpoint entry: %v", err) return nil, err @@ -1302,10 +1301,11 @@ func (ct *catchpointTracker) GetCatchpointStream(round basics.Round) (ReadCloseS // we couldn't get the stat, so just return with the file. return &readCloseSizer{ReadCloser: file, size: -1}, nil } - - err = ct.recordCatchpointFile( - context.Background(), ct.dbs.Wdb.Handle, round, relCatchpointFilePath, - fileInfo.Size()) + crw, err := ct.dbs.CreateCatchpointReaderWriter() + if err != nil { + return nil, err + } + err = ct.recordCatchpointFile(context.Background(), crw, round, relCatchpointFilePath, fileInfo.Size()) if err != nil { ct.log.Warnf("catchpointTracker.GetCatchpointStream() unable to save missing catchpoint entry: %v", err) } diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 33cfe9baed..22f38e599f 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -353,7 +353,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) { // at this point, the database was created. We want to fill the accounts data accountsNumber := 6000000 * b.N - err = ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) for i := 0; i < accountsNumber-5-2; { // subtract the account we've already created above, plus the sink/reward @@ -1002,7 +1002,8 @@ func TestFirstStagePersistence(t *testing.T) { defer ml2.Close() ml.Close() - cps2 := store.NewCatchpointSQLReaderWriter(ml2.dbs.Wdb.Handle) + cps2, err := ml2.dbs.CreateCatchpointReaderWriter() + require.NoError(t, err) // Insert unfinished first stage record. err = cps2.WriteCatchpointStateUint64( @@ -1131,7 +1132,8 @@ func TestSecondStagePersistence(t *testing.T) { err = os.WriteFile(catchpointDataFilePath, catchpointData, 0644) require.NoError(t, err) - cps2 := store.NewCatchpointSQLReaderWriter(ml2.dbs.Wdb.Handle) + cps2, err := ml2.dbs.CreateCatchpointReaderWriter() + require.NoError(t, err) // Restore the first stage database record. err = cps2.InsertOrReplaceCatchpointFirstStageInfo(context.Background(), firstStageRound, &firstStageInfo) @@ -1322,7 +1324,8 @@ func TestSecondStageDeletesUnfinishedCatchpointRecordAfterRestart(t *testing.T) defer ml2.Close() ml.Close() - cps2 := store.NewCatchpointSQLReaderWriter(ml2.dbs.Wdb.Handle) + cps2, err := ml2.dbs.CreateCatchpointReaderWriter() + require.NoError(t, err) // Sanity check: first stage record should be deleted. _, exists, err := cps2.SelectCatchpointFirstStageInfo(context.Background(), firstStageRound) diff --git a/ledger/catchpointwriter_test.go b/ledger/catchpointwriter_test.go index 478df2bb25..5bba123e84 100644 --- a/ledger/catchpointwriter_test.go +++ b/ledger/catchpointwriter_test.go @@ -46,7 +46,6 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" - "github.com/algorand/go-algorand/util/db" "github.com/algorand/msgp/msgp" ) @@ -129,8 +128,7 @@ func TestBasicCatchpointWriter(t *testing.T) { au.close() fileName := filepath.Join(temporaryDirectory, "15.data") - readDb := ml.trackerDB().Rdb - err = readDb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.trackerDB().Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { writer, err := makeCatchpointWriter(context.Background(), fileName, tx, ResourcesPerCatchpointFileChunk) if err != nil { return err @@ -185,7 +183,7 @@ func TestBasicCatchpointWriter(t *testing.T) { require.Equal(t, io.EOF, err) } -func testWriteCatchpoint(t *testing.T, rdb db.Accessor, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader { +func testWriteCatchpoint(t *testing.T, rdb store.TrackerStore, datapath string, filepath string, maxResourcesPerChunk int) CatchpointFileHeader { var totalAccounts uint64 var totalChunks uint64 var biggestChunkLen uint64 @@ -195,7 +193,7 @@ func testWriteCatchpoint(t *testing.T, rdb db.Accessor, datapath string, filepat maxResourcesPerChunk = ResourcesPerCatchpointFileChunk } - err := rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := rdb.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { writer, err := makeCatchpointWriter(context.Background(), datapath, tx, maxResourcesPerChunk) arw := store.NewAccountsSQLReaderWriter(tx) @@ -285,9 +283,8 @@ func TestCatchpointReadDatabaseOverflowSingleAccount(t *testing.T) { require.NoError(t, err) au.close() catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") - readDb := ml.trackerDB().Rdb - err = readDb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.trackerDB().Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { expectedTotalAccounts := uint64(1) totalAccountsWritten := uint64(0) totalResources := 0 @@ -372,9 +369,8 @@ func TestCatchpointReadDatabaseOverflowAccounts(t *testing.T) { require.NoError(t, err) au.close() catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") - readDb := ml.trackerDB().Rdb - err = readDb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = ml.trackerDB().Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) expectedTotalAccounts, err := arw.TotalAccounts(ctx) if err != nil { @@ -444,9 +440,9 @@ func TestFullCatchpointWriterOverflowAccounts(t *testing.T) { catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint") const maxResourcesPerChunk = 5 - testWriteCatchpoint(t, ml.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk) + testWriteCatchpoint(t, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, maxResourcesPerChunk) - l := testNewLedgerFromCatchpoint(t, ml.trackerDB().Rdb, catchpointFilePath) + l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath) defer l.Close() // verify that the account data aligns with what we originally stored : @@ -463,65 +459,68 @@ func TestFullCatchpointWriterOverflowAccounts(t *testing.T) { // now manually construct the MT and ensure the reading makeOrderedAccountsIter works as expected: // no errors on read, hashes match ctx := context.Background() - tx, err := l.trackerDBs.Wdb.Handle.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) - require.NoError(t, err) - defer tx.Rollback() - - arw := store.NewAccountsSQLReaderWriter(tx) - - // save the existing hash - committer, err := store.MakeMerkleCommitter(tx, false) - require.NoError(t, err) - trie, err := merkletrie.MakeTrie(committer, store.TrieMemoryConfig) - require.NoError(t, err) + // tx, err := l.trackerDBs.Wdb.Handle.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + err = l.trackerDBs.TransactionContext(ctx, func(ctx context.Context, tx *sql.Tx) (err error) { + arw := store.NewAccountsSQLReaderWriter(tx) - h1, err := trie.RootHash() - require.NoError(t, err) - require.NotEmpty(t, h1) + // save the existing hash + committer, err := store.MakeMerkleCommitter(tx, false) + require.NoError(t, err) + trie, err := merkletrie.MakeTrie(committer, store.TrieMemoryConfig) + require.NoError(t, err) - // reset hashes - err = arw.ResetAccountHashes(ctx) - require.NoError(t, err) + h1, err := trie.RootHash() + require.NoError(t, err) + require.NotEmpty(t, h1) - // rebuild the MT - committer, err = store.MakeMerkleCommitter(tx, false) - require.NoError(t, err) - trie, err = merkletrie.MakeTrie(committer, store.TrieMemoryConfig) - require.NoError(t, err) + // reset hashes + err = arw.ResetAccountHashes(ctx) + require.NoError(t, err) - h, err := trie.RootHash() - require.NoError(t, err) - require.Zero(t, h) + // rebuild the MT + committer, err = store.MakeMerkleCommitter(tx, false) + require.NoError(t, err) + trie, err = merkletrie.MakeTrie(committer, store.TrieMemoryConfig) + require.NoError(t, err) - iter := store.MakeOrderedAccountsIter(tx, trieRebuildAccountChunkSize) - defer iter.Close(ctx) - for { - accts, _, err := iter.Next(ctx) - if err == sql.ErrNoRows { - // the account builder would return sql.ErrNoRows when no more data is available. - err = nil - break - } else if err != nil { - require.NoError(t, err) - } + h, err := trie.RootHash() + require.NoError(t, err) + require.Zero(t, h) - if len(accts) > 0 { - for _, acct := range accts { - added, err := trie.Add(acct.Digest) + iter := store.MakeOrderedAccountsIter(tx, trieRebuildAccountChunkSize) + defer iter.Close(ctx) + for { + accts, _, err := iter.Next(ctx) + if err == sql.ErrNoRows { + // the account builder would return sql.ErrNoRows when no more data is available. + err = nil + break + } else if err != nil { require.NoError(t, err) - require.True(t, added) + } + + if len(accts) > 0 { + for _, acct := range accts { + added, err := trie.Add(acct.Digest) + require.NoError(t, err) + require.True(t, added) + } } } - } - require.NoError(t, err) - h2, err := trie.RootHash() - require.NoError(t, err) - require.NotEmpty(t, h2) - require.Equal(t, h1, h2) + require.NoError(t, err) + h2, err := trie.RootHash() + require.NoError(t, err) + require.NotEmpty(t, h2) + + require.Equal(t, h1, h2) + + return nil + }) + require.NoError(t, err) } -func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess db.Accessor, filepath string) *Ledger { +func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess store.TrackerStore, filepath string) *Ledger { // create a ledger. var initState ledgercore.InitState initState.Block.CurrentProtocol = protocol.ConsensusCurrentVersion @@ -573,16 +572,16 @@ func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess db.Acc err = accessor.BuildMerkleTrie(context.Background(), nil) require.NoError(t, err) - err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = l.trackerDBs.Batch(func(ctx context.Context, tx *sql.Tx) error { crw := store.NewCatchpointSQLReaderWriter(tx) err := crw.ApplyCatchpointStagingBalances(ctx, 0, 0) return err }) require.NoError(t, err) - balanceTrieStats := func(db db.Accessor) merkletrie.Stats { + balanceTrieStats := func(db store.TrackerStore) merkletrie.Stats { var stats merkletrie.Stats - err = db.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = db.Transaction(func(ctx context.Context, tx *sql.Tx) (err error) { committer, err := store.MakeMerkleCommitter(tx, false) if err != nil { return err @@ -606,7 +605,7 @@ func testNewLedgerFromCatchpoint(t *testing.T, catchpointWriterReadAccess db.Acc // Skip invariant check for tests using mocks that do _not_ update // balancesTrie by checking for zero value stats. if ws != (merkletrie.Stats{}) { - require.Equal(t, ws, balanceTrieStats(l.trackerDBs.Rdb), "Invariant broken - Catchpoint writer and reader merkle tries should _always_ agree") + require.Equal(t, ws, balanceTrieStats(l.trackerDBs), "Invariant broken - Catchpoint writer and reader merkle tries should _always_ agree") } return l @@ -640,9 +639,9 @@ func TestFullCatchpointWriter(t *testing.T) { catchpointDataFilePath := filepath.Join(temporaryDirectory, "15.data") catchpointFilePath := filepath.Join(temporaryDirectory, "15.catchpoint") - testWriteCatchpoint(t, ml.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, 0) + testWriteCatchpoint(t, ml.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) - l := testNewLedgerFromCatchpoint(t, ml.trackerDB().Rdb, catchpointFilePath) + l := testNewLedgerFromCatchpoint(t, ml.trackerDB(), catchpointFilePath) defer l.Close() // verify that the account data aligns with what we originally stored : for addr, acct := range accts { @@ -688,10 +687,10 @@ func TestExactAccountChunk(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, cph.TotalChunks, 1) - l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB().Rdb, catchpointFilePath) + l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) defer l.Close() } @@ -739,10 +738,10 @@ func TestCatchpointAfterTxns(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, 2, cph.TotalChunks) - l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointFilePath) + l := testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) defer l.Close() values, err := l.LookupKeysByPrefix(l.Latest(), "bx:", 10) require.NoError(t, err) @@ -755,12 +754,12 @@ func TestCatchpointAfterTxns(t *testing.T) { dl.fullBlock(&newacctpay) // Write and read back in, and ensure even the last effect exists. - cph = testWriteCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, 0) + cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, cph.TotalChunks, 2) // Still only 2 chunks, as last was in a recent block // Drive home the point that `last` is _not_ included in the catchpoint by inspecting balance read from catchpoint. { - l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointFilePath) + l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) defer l.Close() _, _, algos, err := l.LookupLatest(last) require.NoError(t, err) @@ -771,10 +770,10 @@ func TestCatchpointAfterTxns(t *testing.T) { dl.fullBlock(pay.Noted(strconv.Itoa(i))) } - cph = testWriteCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, 0) + cph = testWriteCatchpoint(t, dl.validator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, cph.TotalChunks, 3) - l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB().Rdb, catchpointFilePath) + l = testNewLedgerFromCatchpoint(t, dl.validator.trackerDB(), catchpointFilePath) defer l.Close() values, err = l.LookupKeysByPrefix(l.Latest(), "bx:", 10) require.NoError(t, err) @@ -860,10 +859,10 @@ func TestCatchpointAfterBoxTxns(t *testing.T) { catchpointDataFilePath := filepath.Join(tempDir, t.Name()+".data") catchpointFilePath := filepath.Join(tempDir, t.Name()+".catchpoint.tar.gz") - cph := testWriteCatchpoint(t, dl.generator.trackerDB().Rdb, catchpointDataFilePath, catchpointFilePath, 0) + cph := testWriteCatchpoint(t, dl.generator.trackerDB(), catchpointDataFilePath, catchpointFilePath, 0) require.EqualValues(t, 2, cph.TotalChunks) - l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB().Rdb, catchpointFilePath) + l := testNewLedgerFromCatchpoint(t, dl.generator.trackerDB(), catchpointFilePath) defer l.Close() values, err := l.LookupKeysByPrefix(l.Latest(), "bx:", 10) diff --git a/ledger/catchupaccessor.go b/ledger/catchupaccessor.go index 41e14a167e..67d83f1f94 100644 --- a/ledger/catchupaccessor.go +++ b/ledger/catchupaccessor.go @@ -103,18 +103,18 @@ type stagingWriter interface { } type stagingWriterImpl struct { - wdb db.Accessor + wdb store.TrackerStore } func (w *stagingWriterImpl) writeBalances(ctx context.Context, balances []store.NormalizedAccountBalance) error { - return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + return w.wdb.Transaction(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) return crw.WriteCatchpointStagingBalances(ctx, balances) }) } func (w *stagingWriterImpl) writeKVs(ctx context.Context, kvrs []encoded.KVRecordV6) error { - return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + return w.wdb.Transaction(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) keys := make([][]byte, len(kvrs)) @@ -131,14 +131,14 @@ func (w *stagingWriterImpl) writeKVs(ctx context.Context, kvrs []encoded.KVRecor } func (w *stagingWriterImpl) writeCreatables(ctx context.Context, balances []store.NormalizedAccountBalance) error { - return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + return w.wdb.Transaction(func(ctx context.Context, tx *sql.Tx) error { crw := store.NewCatchpointSQLReaderWriter(tx) return crw.WriteCatchpointStagingCreatable(ctx, balances) }) } func (w *stagingWriterImpl) writeHashes(ctx context.Context, balances []store.NormalizedAccountBalance) error { - return w.wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + return w.wdb.Transaction(func(ctx context.Context, tx *sql.Tx) error { crw := store.NewCatchpointSQLReaderWriter(tx) err := crw.WriteCatchpointStagingHashes(ctx, balances) return err @@ -152,7 +152,7 @@ func (w *stagingWriterImpl) isShared() bool { // catchpointCatchupAccessorImpl is the concrete implementation of the CatchpointCatchupAccessor interface type catchpointCatchupAccessorImpl struct { ledger *Ledger - catchpointStore catchpointStore + catchpointStore store.CatchpointReaderWriter stagingWriter stagingWriter @@ -204,10 +204,11 @@ type CatchupAccessorClientLedger interface { // MakeCatchpointCatchupAccessor creates a CatchpointCatchupAccessor given a ledger func MakeCatchpointCatchupAccessor(ledger *Ledger, log logging.Logger) CatchpointCatchupAccessor { + crw, _ := ledger.trackerDB().CreateCatchpointReaderWriter() return &catchpointCatchupAccessorImpl{ ledger: ledger, - catchpointStore: store.NewCatchpointSQLReaderWriter(ledger.trackerDB().Wdb.Handle), - stagingWriter: &stagingWriterImpl{wdb: ledger.trackerDB().Wdb}, + catchpointStore: crw, + stagingWriter: &stagingWriterImpl{wdb: ledger.trackerDB()}, log: log, } } @@ -260,13 +261,12 @@ func (c *catchpointCatchupAccessorImpl) SetLabel(ctx context.Context, label stri // ResetStagingBalances resets the current staging balances, preparing for a new set of balances to be added func (c *catchpointCatchupAccessorImpl) ResetStagingBalances(ctx context.Context, newCatchup bool) (err error) { - wdb := c.ledger.trackerDB().Wdb if !newCatchup { c.ledger.setSynchronousMode(ctx, c.ledger.synchronousMode) } start := time.Now() ledgerResetstagingbalancesCount.Inc(nil) - err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = c.ledger.trackerDB().Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) err = crw.ResetCatchpointStagingBalances(ctx, newCatchup) if err != nil { @@ -353,10 +353,9 @@ func (c *catchpointCatchupAccessorImpl) processStagingContent(ctx context.Contex // the following fields are now going to be ignored. We could add these to the database and validate these // later on: // TotalAccounts, TotalAccounts, Catchpoint, BlockHeaderDigest, BalancesRound - wdb := c.ledger.trackerDB().Wdb start := time.Now() ledgerProcessstagingcontentCount.Inc(nil) - err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = c.ledger.trackerDB().Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) arw := store.NewAccountsSQLReaderWriter(tx) @@ -642,9 +641,8 @@ func countHashes(hashes [][]byte) (accountCount, kvCount uint64) { // BuildMerkleTrie would process the catchpointpendinghashes and insert all the items in it into the merkle trie func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, progressUpdates func(uint64, uint64)) (err error) { - wdb := c.ledger.trackerDB().Wdb - rdb := c.ledger.trackerDB().Rdb - err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + trackerdb := c.ledger.trackerDB() + err = trackerdb.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) // creating the index can take a while, so ensure we don't generate false alerts for no good reason. db.ResetTransactionWarnDeadline(ctx, tx, time.Now().Add(120*time.Second)) @@ -667,7 +665,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro defer wg.Done() defer close(writerQueue) - err := rdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + err := trackerdb.Snapshot(func(transactionCtx context.Context, tx *sql.Tx) (err error) { it := store.MakeCatchpointPendingHashesIterator(trieRebuildAccountChunkSize, tx) var hashes [][]byte for { @@ -705,7 +703,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro accountHashesWritten, kvHashesWritten := uint64(0), uint64(0) var mc *store.MerkleCommitter - err := wdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + err := trackerdb.Batch(func(transactionCtx context.Context, tx *sql.Tx) (err error) { // create the merkle trie for the balances mc, err = store.MakeMerkleCommitter(tx, true) if err != nil { @@ -734,7 +732,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro continue } - err = rdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + err = trackerdb.Snapshot(func(transactionCtx context.Context, tx *sql.Tx) (err error) { mc, err = store.MakeMerkleCommitter(tx, true) if err != nil { return @@ -764,7 +762,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro } if uncommitedHashesCount >= trieRebuildCommitFrequency { - err = wdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + err = trackerdb.Batch(func(transactionCtx context.Context, tx *sql.Tx) (err error) { // set a long 30-second window for the evict before warning is generated. db.ResetTransactionWarnDeadline(transactionCtx, tx, time.Now().Add(30*time.Second)) mc, err = store.MakeMerkleCommitter(tx, true) @@ -794,7 +792,7 @@ func (c *catchpointCatchupAccessorImpl) BuildMerkleTrie(ctx context.Context, pro return } if uncommitedHashesCount > 0 { - err = wdb.Atomic(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + err = trackerdb.Batch(func(transactionCtx context.Context, tx *sql.Tx) (err error) { // set a long 30-second window for the evict before warning is generated. db.ResetTransactionWarnDeadline(transactionCtx, tx, time.Now().Add(30*time.Second)) mc, err = store.MakeMerkleCommitter(tx, true) @@ -835,7 +833,6 @@ func (c *catchpointCatchupAccessorImpl) GetCatchupBlockRound(ctx context.Context // VerifyCatchpoint verifies that the catchpoint is valid by reconstructing the label. func (c *catchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, blk *bookkeeping.Block) (err error) { - rdb := c.ledger.trackerDB().Rdb var balancesHash crypto.Digest var blockRound basics.Round var totals ledgercore.AccountTotals @@ -855,7 +852,7 @@ func (c *catchpointCatchupAccessorImpl) VerifyCatchpoint(ctx context.Context, bl start := time.Now() ledgerVerifycatchpointCount.Inc(nil) - err = rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = c.ledger.trackerDB().Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) // create the merkle trie for the balances mc, err0 := store.MakeMerkleCommitter(tx, true) @@ -905,10 +902,9 @@ func (c *catchpointCatchupAccessorImpl) StoreBalancesRound(ctx context.Context, catchpointLookback = config.Consensus[blk.CurrentProtocol].MaxBalLookback } balancesRound := blk.Round() - basics.Round(catchpointLookback) - wdb := c.ledger.trackerDB().Wdb start := time.Now() ledgerStorebalancesroundCount.Inc(nil) - err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = c.ledger.trackerDB().Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) err = crw.WriteCatchpointStateUint64(ctx, store.CatchpointStateCatchupBalancesRound, uint64(balancesRound)) if err != nil { @@ -1002,10 +998,9 @@ func (c *catchpointCatchupAccessorImpl) CompleteCatchup(ctx context.Context) (er // finishBalances concludes the catchup of the balances(tracker) database. func (c *catchpointCatchupAccessorImpl) finishBalances(ctx context.Context) (err error) { - wdb := c.ledger.trackerDB().Wdb start := time.Now() ledgerCatchpointFinishBalsCount.Inc(nil) - err = wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = c.ledger.trackerDB().Batch(func(ctx context.Context, tx *sql.Tx) (err error) { crw := store.NewCatchpointSQLReaderWriter(tx) arw := store.NewAccountsSQLReaderWriter(tx) diff --git a/ledger/ledger.go b/ledger/ledger.go index cd023e5023..bf3e6f0a74 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -35,6 +35,7 @@ import ( "github.com/algorand/go-algorand/ledger/apply" "github.com/algorand/go-algorand/ledger/internal" "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/ledger/store" "github.com/algorand/go-algorand/ledger/store/blockdb" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" @@ -48,7 +49,7 @@ type Ledger struct { // Database connections to the DBs storing blocks and tracker state. // We use potentially different databases to avoid SQLite contention // during catchup. - trackerDBs db.Pair + trackerDBs store.TrackerStore blockDBs db.Pair // blockQ is the buffer of added blocks that will be flushed to @@ -138,8 +139,7 @@ func OpenLedger( err = fmt.Errorf("OpenLedger.openLedgerDB %v", err) return nil, err } - l.trackerDBs.Rdb.SetLogger(log) - l.trackerDBs.Wdb.SetLogger(log) + l.trackerDBs.SetLogger(log) l.blockDBs.Rdb.SetLogger(log) l.blockDBs.Wdb.SetLogger(log) @@ -273,7 +273,7 @@ func (l *Ledger) verifyMatchingGenesisHash() (err error) { return } -func openLedgerDB(dbPathPrefix string, dbMem bool) (trackerDBs db.Pair, blockDBs db.Pair, err error) { +func openLedgerDB(dbPathPrefix string, dbMem bool) (trackerDBs store.TrackerStore, blockDBs db.Pair, err error) { // Backwards compatibility: we used to store both blocks and tracker // state in a single SQLite db file. var trackerDBFilename string @@ -297,7 +297,7 @@ func openLedgerDB(dbPathPrefix string, dbMem bool) (trackerDBs db.Pair, blockDBs outErr := make(chan error, 2) go func() { var lerr error - trackerDBs, lerr = db.OpenPair(trackerDBFilename, dbMem) + trackerDBs, lerr = store.OpenTrackerSQLStore(trackerDBFilename, dbMem) outErr <- lerr }() @@ -328,7 +328,7 @@ func (l *Ledger) setSynchronousMode(ctx context.Context, synchronousMode db.Sync return } - err = l.trackerDBs.Wdb.SetSynchronousMode(ctx, synchronousMode, synchronousMode >= db.SynchronousModeFull) + err = l.trackerDBs.SetSynchronousMode(ctx, synchronousMode, synchronousMode >= db.SynchronousModeFull) if err != nil { l.log.Warnf("ledger.setSynchronousMode unable to set synchronous mode on trackers db: %v", err) return @@ -765,7 +765,7 @@ func (l *Ledger) GetCatchpointStream(round basics.Round) (ReadCloseSizer, error) } // ledgerForTracker methods -func (l *Ledger) trackerDB() db.Pair { +func (l *Ledger) trackerDB() store.TrackerStore { return l.trackerDBs } diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index b9a1c64037..6a2ca25e65 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -2279,7 +2279,7 @@ func TestLedgerReloadTxTailHistoryAccess(t *testing.T) { // reset tables and re-init again, similary to the catchpount apply code // since the ledger has only genesis accounts, this recreates them - err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = l.trackerDBs.Batch(func(ctx context.Context, tx *sql.Tx) error { arw := store.NewAccountsSQLReaderWriter(tx) err0 := arw.AccountsReset(ctx) if err0 != nil { @@ -2335,7 +2335,7 @@ func TestLedgerReloadTxTailHistoryAccess(t *testing.T) { // drop new tables // reloadLedger should migrate db properly - err = l.trackerDBs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = l.trackerDBs.Batch(func(ctx context.Context, tx *sql.Tx) error { var resetExprs = []string{ `DROP TABLE IF EXISTS onlineaccounts`, `DROP TABLE IF EXISTS txtail`, @@ -2458,7 +2458,7 @@ func TestLedgerMigrateV6ShrinkDeltas(t *testing.T) { blockDB.Close() }() // create tables so online accounts can still be written - err = trackerDB.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = trackerDB.Batch(func(ctx context.Context, tx *sql.Tx) error { if err := store.AccountsUpdateSchemaTest(ctx, tx); err != nil { return err } @@ -2635,7 +2635,7 @@ func TestLedgerMigrateV6ShrinkDeltas(t *testing.T) { cfg.MaxAcctLookback = shorterLookback store.AccountDBVersion = 7 // delete tables since we want to check they can be made from other data - err = trackerDB.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = trackerDB.Batch(func(ctx context.Context, tx *sql.Tx) error { if _, err := tx.ExecContext(ctx, "DROP TABLE IF EXISTS onlineaccounts"); err != nil { return err } diff --git a/ledger/store/interface.go b/ledger/store/interface.go index d2f8fc4e00..3a4d7a7e3a 100644 --- a/ledger/store/interface.go +++ b/ledger/store/interface.go @@ -90,6 +90,7 @@ type CatchpointWriter interface { InsertUnfinishedCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest) error DeleteUnfinishedCatchpoint(ctx context.Context, round basics.Round) error DeleteOldCatchpointFirstStageInfo(ctx context.Context, maxRoundToDelete basics.Round) error + InsertOrReplaceCatchpointFirstStageInfo(ctx context.Context, round basics.Round, info *CatchpointFirstStageInfo) error DeleteStoredCatchpoints(ctx context.Context, dbDirectory string) (err error) } @@ -107,3 +108,9 @@ type CatchpointReader interface { SelectCatchpointFirstStageInfo(ctx context.Context, round basics.Round) (CatchpointFirstStageInfo, bool /*exists*/, error) SelectOldCatchpointFirstStageInfoRounds(ctx context.Context, maxRound basics.Round) ([]basics.Round, error) } + +// CatchpointReaderWriter is CatchpointReader+CatchpointWriter +type CatchpointReaderWriter interface { + CatchpointReader + CatchpointWriter +} diff --git a/ledger/store/store.go b/ledger/store/store.go new file mode 100644 index 0000000000..071c29a246 --- /dev/null +++ b/ledger/store/store.go @@ -0,0 +1,146 @@ +// Copyright (C) 2019-2023 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package store + +import ( + "context" + "database/sql" + + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/util/db" +) + +type trackerSQLStore struct { + // expose the internals for now so we can slowly change the code depending on them + pair db.Pair +} + +// TODO: maintain a SQL tx for now +type batchFn func(ctx context.Context, tx *sql.Tx) error + +// TODO: maintain a SQL tx for now +type snapshotFn func(ctx context.Context, tx *sql.Tx) error + +// TODO: maintain a SQL tx for now +type transactionFn func(ctx context.Context, tx *sql.Tx) error + +// TrackerStore is the interface for the tracker db. +type TrackerStore interface { + SetLogger(log logging.Logger) + SetSynchronousMode(ctx context.Context, mode db.SynchronousMode, fullfsync bool) (err error) + IsSharedCacheConnection() bool + + Batch(fn batchFn) (err error) + BatchContext(ctx context.Context, fn batchFn) (err error) + + Snapshot(fn snapshotFn) (err error) + SnapshotContext(ctx context.Context, fn snapshotFn) (err error) + + Transaction(fn transactionFn) (err error) + TransactionContext(ctx context.Context, fn transactionFn) (err error) + + CreateAccountsReader() (AccountsReader, error) + CreateOnlineAccountsReader() (OnlineAccountsReader, error) + + CreateCatchpointReaderWriter() (CatchpointReaderWriter, error) + + Vacuum(ctx context.Context) (stats db.VacuumStats, err error) + Close() +} + +// OpenTrackerSQLStore opens the sqlite database store +func OpenTrackerSQLStore(dbFilename string, dbMem bool) (store *trackerSQLStore, err error) { + db, err := db.OpenPair(dbFilename, dbMem) + if err != nil { + return + } + + return &trackerSQLStore{db}, nil +} + +// CreateTrackerSQLStore crates a tracker SQL db from sql db handle. +func CreateTrackerSQLStore(pair db.Pair) *trackerSQLStore { + return &trackerSQLStore{pair} +} + +// SetLogger sets the Logger, mainly for unit test quietness +func (s *trackerSQLStore) SetLogger(log logging.Logger) { + s.pair.Rdb.SetLogger(log) + s.pair.Wdb.SetLogger(log) +} + +func (s *trackerSQLStore) SetSynchronousMode(ctx context.Context, mode db.SynchronousMode, fullfsync bool) (err error) { + return s.pair.Wdb.SetSynchronousMode(ctx, mode, fullfsync) +} + +func (s *trackerSQLStore) IsSharedCacheConnection() bool { + return s.pair.Wdb.IsSharedCacheConnection() +} + +func (s *trackerSQLStore) Batch(fn batchFn) (err error) { + return s.BatchContext(context.Background(), fn) +} + +func (s *trackerSQLStore) BatchContext(ctx context.Context, fn batchFn) (err error) { + return s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + return fn(ctx, tx) + }) +} + +func (s *trackerSQLStore) Snapshot(fn snapshotFn) (err error) { + return s.SnapshotContext(context.Background(), fn) +} + +func (s *trackerSQLStore) SnapshotContext(ctx context.Context, fn snapshotFn) (err error) { + return s.pair.Rdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + return fn(ctx, tx) + }) +} + +func (s *trackerSQLStore) Transaction(fn transactionFn) (err error) { + return s.TransactionContext(context.Background(), fn) +} + +func (s *trackerSQLStore) TransactionContext(ctx context.Context, fn transactionFn) (err error) { + return s.pair.Wdb.AtomicContext(ctx, func(ctx context.Context, tx *sql.Tx) error { + return fn(ctx, tx) + }) +} + +func (s *trackerSQLStore) CreateAccountsReader() (AccountsReader, error) { + return AccountsInitDbQueries(s.pair.Rdb.Handle) +} + +func (s *trackerSQLStore) CreateOnlineAccountsReader() (OnlineAccountsReader, error) { + return OnlineAccountsInitDbQueries(s.pair.Rdb.Handle) +} + +func (s *trackerSQLStore) CreateCatchpointReaderWriter() (CatchpointReaderWriter, error) { + w := NewCatchpointSQLReaderWriter(s.pair.Wdb.Handle) + return w, nil +} + +// TODO: rename: this is a sqlite specific name, this could also be used to trigger compact on KV stores. +// it seems to only be used during a v2 migration +func (s *trackerSQLStore) Vacuum(ctx context.Context) (stats db.VacuumStats, err error) { + _, err = s.pair.Wdb.Vacuum(ctx) + return +} + +func (s *trackerSQLStore) Close() { + s.pair.Close() +} diff --git a/ledger/store/testing.go b/ledger/store/testing.go index 0e426a28c3..babeca5094 100644 --- a/ledger/store/testing.go +++ b/ledger/store/testing.go @@ -19,15 +19,28 @@ package store import ( "context" "database/sql" + "fmt" + "strings" "testing" "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/db" "github.com/stretchr/testify/require" ) +// DbOpenTrackerTest opens a sqlite db file for testing purposes. +func DbOpenTrackerTest(t testing.TB, inMemory bool) (TrackerStore, string) { + fn := fmt.Sprintf("%s.%d", strings.ReplaceAll(t.Name(), "/", "."), crypto.RandUint64()) + + dbs, err := db.OpenPair(fn, inMemory) + require.NoErrorf(t, err, "Filename : %s\nInMemory: %v", fn, inMemory) + + return &trackerSQLStore{dbs}, fn +} + // AccountsInitLightTest initializes an empty database for testing without the extra methods being called. func AccountsInitLightTest(tb testing.TB, tx *sql.Tx, initAccounts map[basics.Address]basics.AccountData, proto config.ConsensusParams) (newDatabase bool, err error) { newDB, err := accountsInit(tx, initAccounts, proto) diff --git a/ledger/tracker.go b/ledger/tracker.go index b87c6fbe3f..d43faec3f5 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -134,7 +134,7 @@ type ledgerTracker interface { // ledgerForTracker defines the part of the ledger that a tracker can // access. This is particularly useful for testing trackers in isolation. type ledgerForTracker interface { - trackerDB() db.Pair + trackerDB() store.TrackerStore blockDB() db.Pair trackerLog() logging.Logger trackerEvalVerified(bookkeeping.Block, internal.LedgerForEvaluator) (ledgercore.StateDelta, error) @@ -174,7 +174,7 @@ type trackerRegistry struct { // cached to avoid SQL queries. dbRound basics.Round - dbs db.Pair + dbs store.TrackerStore log logging.Logger // the synchronous mode that would be used for the account database. @@ -279,7 +279,7 @@ func (tr *trackerRegistry) initialize(l ledgerForTracker, trackers []ledgerTrack tr.dbs = l.trackerDB() tr.log = l.trackerLog() - err = tr.dbs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = tr.dbs.Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) tr.dbRound, err = arw.AccountsRound() return err @@ -510,7 +510,7 @@ func (tr *trackerRegistry) commitRound(dcc *deferredCommitContext) error { start := time.Now() ledgerCommitroundCount.Inc(nil) - err := tr.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := tr.dbs.Batch(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) for _, lt := range tr.trackers { err0 := lt.commitRound(ctx, tx, dcc) @@ -631,7 +631,7 @@ func (tr *trackerRegistry) replay(l ledgerForTracker) (err error) { defer func() { if rollbackSynchronousMode { // restore default synchronous mode - err0 := tr.dbs.Wdb.SetSynchronousMode(context.Background(), tr.synchronousMode, tr.synchronousMode >= db.SynchronousModeFull) + err0 := tr.dbs.SetSynchronousMode(context.Background(), tr.synchronousMode, tr.synchronousMode >= db.SynchronousModeFull) // override the returned error only in case there is no error - since this // operation has a lower criticality. if err == nil { @@ -662,7 +662,7 @@ func (tr *trackerRegistry) replay(l ledgerForTracker) (err error) { if !rollbackSynchronousMode { // switch to rebuild synchronous mode to improve performance - err0 := tr.dbs.Wdb.SetSynchronousMode(context.Background(), tr.accountsRebuildSynchronousMode, tr.accountsRebuildSynchronousMode >= db.SynchronousModeFull) + err0 := tr.dbs.SetSynchronousMode(context.Background(), tr.accountsRebuildSynchronousMode, tr.accountsRebuildSynchronousMode >= db.SynchronousModeFull) if err0 != nil { tr.log.Warnf("trackerRegistry.replay was unable to switch to rbuild synchronous mode : %v", err0) } else { diff --git a/ledger/trackerdb.go b/ledger/trackerdb.go index 17c6872d95..e65b7ba2cb 100644 --- a/ledger/trackerdb.go +++ b/ledger/trackerdb.go @@ -39,7 +39,7 @@ func trackerDBInitialize(l ledgerForTracker, catchpointEnabled bool, dbPathPrefi return } - err = dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + err = dbs.Batch(func(ctx context.Context, tx *sql.Tx) error { arw := store.NewAccountsSQLReaderWriter(tx) tp := store.TrackerDBParams{ diff --git a/ledger/txtail.go b/ledger/txtail.go index 879f0ee7d3..4fc1ea63d4 100644 --- a/ledger/txtail.go +++ b/ledger/txtail.go @@ -91,14 +91,13 @@ type txTail struct { } func (t *txTail) loadFromDisk(l ledgerForTracker, dbRound basics.Round) error { - rdb := l.trackerDB().Rdb t.log = l.trackerLog() var roundData []*store.TxTailRound var roundTailHashes []crypto.Digest var baseRound basics.Round if dbRound > 0 { - err := rdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err := l.trackerDB().Snapshot(func(ctx context.Context, tx *sql.Tx) (err error) { arw := store.NewAccountsSQLReaderWriter(tx) roundData, roundTailHashes, baseRound, err = arw.LoadTxTail(ctx, dbRound) return diff --git a/ledger/txtail_test.go b/ledger/txtail_test.go index fce531aa9f..f794699a1e 100644 --- a/ledger/txtail_test.go +++ b/ledger/txtail_test.go @@ -18,6 +18,7 @@ package ledger import ( "context" + "database/sql" "errors" "fmt" "testing" @@ -150,32 +151,34 @@ func (t *txTailTestLedger) initialize(ts *testing.T, protoVersion protocol.Conse // create a corresponding blockdb. inMemory := true t.blockDBs, _ = storetesting.DbOpenTest(ts, inMemory) - t.trackerDBs, _ = storetesting.DbOpenTest(ts, inMemory) + t.trackerDBs, _ = store.DbOpenTrackerTest(ts, inMemory) t.protoVersion = protoVersion - tx, err := t.trackerDBs.Wdb.Handle.Begin() - require.NoError(ts, err) - - arw := store.NewAccountsSQLReaderWriter(tx) - - accts := ledgertesting.RandomAccounts(20, true) - proto := config.Consensus[protoVersion] - newDB := store.AccountsInitTest(ts, tx, accts, protoVersion) - require.True(ts, newDB) - - roundData := make([][]byte, 0, proto.MaxTxnLife) - startRound := t.Latest() - basics.Round(proto.MaxTxnLife) + 1 - for i := startRound; i <= t.Latest(); i++ { - blk, err := t.Block(i) - require.NoError(ts, err) - tail, err := store.TxTailRoundFromBlock(blk) + err := t.trackerDBs.Transaction(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + arw := store.NewAccountsSQLReaderWriter(tx) + + accts := ledgertesting.RandomAccounts(20, true) + proto := config.Consensus[protoVersion] + newDB := store.AccountsInitTest(ts, tx, accts, protoVersion) + require.True(ts, newDB) + + roundData := make([][]byte, 0, proto.MaxTxnLife) + startRound := t.Latest() - basics.Round(proto.MaxTxnLife) + 1 + for i := startRound; i <= t.Latest(); i++ { + blk, err := t.Block(i) + require.NoError(ts, err) + tail, err := store.TxTailRoundFromBlock(blk) + require.NoError(ts, err) + encoded, _ := tail.Encode() + roundData = append(roundData, encoded) + } + err = arw.TxtailNewRound(context.Background(), startRound, roundData, 0) require.NoError(ts, err) - encoded, _ := tail.Encode() - roundData = append(roundData, encoded) - } - err = arw.TxtailNewRound(context.Background(), startRound, roundData, 0) + + return nil + }) require.NoError(ts, err) - tx.Commit() + return nil } @@ -296,12 +299,13 @@ func TestTxTailDeltaTracking(t *testing.T) { err = txtail.prepareCommit(dcc) require.NoError(t, err) - tx, err := ledger.trackerDBs.Wdb.Handle.Begin() + err := ledger.trackerDBs.Transaction(func(transactionCtx context.Context, tx *sql.Tx) (err error) { + err = txtail.commitRound(context.Background(), tx, dcc) + require.NoError(t, err) + return nil + }) require.NoError(t, err) - err = txtail.commitRound(context.Background(), tx, dcc) - require.NoError(t, err) - tx.Commit() proto := config.Consensus[protoVersion] retainSize := proto.MaxTxnLife + proto.DeeperBlockHeaderHistory if uint64(i) > proto.MaxTxnLife*2 {