diff --git a/api/handlers_e2e_test.go b/api/handlers_e2e_test.go index 67c66cf1b..bd7ba3206 100644 --- a/api/handlers_e2e_test.go +++ b/api/handlers_e2e_test.go @@ -29,13 +29,18 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeepin db, _, err := postgres.OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) + newShutdownFunc := func() { + db.Close() + shutdownFunc() + } + err = db.LoadGenesis(genesis) require.NoError(t, err) err = db.AddBlock(&genesisBlock) require.NoError(t, err) - return db, shutdownFunc + return db, newShutdownFunc } func TestApplicationHander(t *testing.T) { diff --git a/cmd/algorand-indexer/daemon.go b/cmd/algorand-indexer/daemon.go index 2f1df571c..8d4b12f62 100644 --- a/cmd/algorand-indexer/daemon.go +++ b/cmd/algorand-indexer/daemon.go @@ -6,6 +6,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time" @@ -83,8 +84,13 @@ var daemonCmd = &cobra.Command{ opts.ReadOnly = true } db, availableCh := indexerDbFromFlags(opts) + defer db.Close() + var wg sync.WaitGroup if bot != nil { + wg.Add(1) go func() { + defer wg.Done() + // Wait until the database is available. <-availableCh @@ -97,13 +103,21 @@ var daemonCmd = &cobra.Command{ maybeFail(err, "failed to get next round, %v", err) bot.SetNextRound(nextRound) - bih := blockImporterHandler{imp: importer.NewImporter(db)} - bot.AddBlockHandler(&bih) - bot.SetContext(ctx) + imp := importer.NewImporter(db) + handler := func(ctx context.Context, block *rpcs.EncodedBlockCert) error { + return handleBlock(block, &imp) + } + bot.SetBlockHandler(handler) logger.Info("Starting block importer.") - bot.Run() - cf() + err = bot.Run(ctx) + if err != nil { + // If context is not expired. + if ctx.Err() == nil { + logger.WithError(err).Errorf("fetcher exited with error") + os.Exit(1) + } + } }() } else { logger.Info("No block importer configured.") @@ -112,6 +126,7 @@ var daemonCmd = &cobra.Command{ fmt.Printf("serving on %s\n", daemonServerAddr) logger.Infof("serving on %s", daemonServerAddr) api.Serve(ctx, daemonServerAddr, db, bot, logger, makeOptions()) + wg.Wait() }, } @@ -159,14 +174,14 @@ func makeOptions() (options api.ExtraOptions) { return } -type blockImporterHandler struct { - imp importer.Importer -} - -func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) { +func handleBlock(block *rpcs.EncodedBlockCert, imp *importer.Importer) error { start := time.Now() - err := bih.imp.ImportBlock(block) - maybeFail(err, "adding block %d to database failed", block.Block.Round()) + err := imp.ImportBlock(block) + if err != nil { + logger.WithError(err).Errorf( + "adding block %d to database failed", block.Block.Round()) + return fmt.Errorf("handleBlock() err: %w", err) + } dt := time.Since(start) // Ignore round 0 (which is empty). @@ -177,4 +192,6 @@ func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) { } logger.Infof("round r=%d (%d txn) imported in %s", block.Block.Round(), len(block.Block.Payset), dt.String()) + + return nil } diff --git a/cmd/algorand-indexer/import.go b/cmd/algorand-indexer/import.go index 09de09e36..331611e0a 100644 --- a/cmd/algorand-indexer/import.go +++ b/cmd/algorand-indexer/import.go @@ -24,6 +24,7 @@ var importCmd = &cobra.Command{ } db, availableCh := indexerDbFromFlags(idb.IndexerDbOptions{}) + defer db.Close() <-availableCh helper := importer.NewImportHelper( diff --git a/cmd/import-validator/main.go b/cmd/import-validator/main.go index e695be8e9..f075a6cb2 100644 --- a/cmd/import-validator/main.go +++ b/cmd/import-validator/main.go @@ -36,18 +36,6 @@ import ( "github.com/algorand/indexer/util" ) -type blockHandler struct { - f func(*rpcs.EncodedBlockCert) error -} - -func (h blockHandler) HandleBlock(block *rpcs.EncodedBlockCert) { - err := h.f(block) - if err != nil { - fmt.Printf("error handling block %d err: %v\n", block.Block.Round(), err) - os.Exit(1) - } -} - func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) { data, err := client.BlockRaw(0).Do(context.Background()) if err != nil { @@ -278,7 +266,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg nextRoundIndexer, nextRoundLedger) } - blockHandlerFunc := func(block *rpcs.EncodedBlockCert) error { + blockHandlerFunc := func(ctx context.Context, block *rpcs.EncodedBlockCert) error { var modifiedAccounts []basics.Address var err0 error var err1 error @@ -321,9 +309,12 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg return checkModifiedAccounts(db, l, &block.Block, modifiedAccounts) } - bot.AddBlockHandler(blockHandler{f: blockHandlerFunc}) + bot.SetBlockHandler(blockHandlerFunc) bot.SetNextRound(nextRoundLedger) - bot.Run() + err = bot.Run(context.Background()) + if err != nil { + return fmt.Errorf("catchup err: %w", err) + } return nil } diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index 4d5d81b2e..38eedd4bf 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -21,33 +21,24 @@ type Fetcher interface { Algod() *algod.Client // go bot.Run() - Run() + Run(ctx context.Context) error - AddBlockHandler(handler BlockHandler) - SetContext(ctx context.Context) + SetBlockHandler(f func(context.Context, *rpcs.EncodedBlockCert) error) SetNextRound(nextRound uint64) // Error returns any error fetcher is currently experiencing. Error() string } -// BlockHandler is the handler fetcher uses to process a block. -type BlockHandler interface { - HandleBlock(block *rpcs.EncodedBlockCert) -} - type fetcherImpl struct { algorandData string aclient *algod.Client algodLastmod time.Time // newest mod time of algod.net algod.token - blockHandlers []BlockHandler + handler func(context.Context, *rpcs.EncodedBlockCert) error nextRound uint64 - ctx context.Context - done bool - failingSince time.Time log *log.Logger @@ -76,71 +67,64 @@ func (bot *fetcherImpl) Algod() *algod.Client { return bot.aclient } -func (bot *fetcherImpl) isDone() bool { - if bot.done { - return true - } - if bot.ctx == nil { - return false - } - select { - case <-bot.ctx.Done(): - bot.done = true - return true - default: - return false - } -} - func (bot *fetcherImpl) setError(err error) { bot.errmu.Lock() bot.err = err bot.errmu.Unlock() } -func (bot *fetcherImpl) processQueue() { +func (bot *fetcherImpl) processQueue(ctx context.Context) error { for { - block, ok := <-bot.blockQueue - if !ok { - return + select { + case block, ok := <-bot.blockQueue: + if !ok { + return nil + } + err := bot.handler(ctx, block) + if err != nil { + return fmt.Errorf("processQueue() handler err: %w", err) + } + case <-ctx.Done(): + return fmt.Errorf("processQueue: ctx.Err(): %w", ctx.Err()) } - bot.handleBlock(block) } } -func (bot *fetcherImpl) enqueueBlock(blockbytes []byte) error { +func (bot *fetcherImpl) enqueueBlock(ctx context.Context, blockbytes []byte) error { block := new(rpcs.EncodedBlockCert) err := protocol.Decode(blockbytes, block) if err != nil { - return nil + return fmt.Errorf("enqueueBlock() decode err: %w", err) } - bot.blockQueue <- block - return nil + select { + case <-ctx.Done(): + return ctx.Err() + case bot.blockQueue <- block: + return nil + } } // fetch the next block by round number until we find one missing (because it doesn't exist yet) -func (bot *fetcherImpl) catchupLoop() { +func (bot *fetcherImpl) catchupLoop(ctx context.Context) error { var err error var blockbytes []byte aclient := bot.Algod() for { - if bot.isDone() { - return - } - - blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(context.Background()) + blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(ctx) if err != nil { + // If context has expired. + if ctx.Err() != nil { + return fmt.Errorf("catchupLoop() fetch err: %w", err) + } bot.setError(err) bot.log.WithError(err).Errorf("catchup block %d", bot.nextRound) - return + return nil } - err = bot.enqueueBlock(blockbytes) + err = bot.enqueueBlock(ctx, blockbytes) if err != nil { - bot.setError(err) - bot.log.WithError(err).Errorf("error enqueuing catchup block %d", bot.nextRound) - return + return fmt.Errorf("catchupLoop() err: %w", err) } // If we successfully handle the block, clear out any transient error which may have occurred. bot.setError(nil) @@ -150,35 +134,37 @@ func (bot *fetcherImpl) catchupLoop() { } // wait for algod to notify of a new round, then fetch that block -func (bot *fetcherImpl) followLoop() { +func (bot *fetcherImpl) followLoop(ctx context.Context) error { var err error var blockbytes []byte aclient := bot.Algod() for { for retries := 0; retries < 3; retries++ { - if bot.isDone() { - return - } - _, err = aclient.StatusAfterBlock(bot.nextRound).Do(context.Background()) + _, err = aclient.StatusAfterBlock(bot.nextRound).Do(ctx) if err != nil { - bot.log.WithError(err).Errorf("r=%d error getting status %d", retries, bot.nextRound) + // If context has expired. + if ctx.Err() != nil { + return fmt.Errorf("followLoop() status err: %w", err) + } + bot.log.WithError(err).Errorf( + "r=%d error getting status %d", retries, bot.nextRound) continue } - blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(context.Background()) + blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(ctx) if err == nil { break + } else if ctx.Err() != nil { // if context has expired + return fmt.Errorf("followLoop() fetch block err: %w", err) } bot.log.WithError(err).Errorf("r=%d err getting block %d", retries, bot.nextRound) } if err != nil { bot.setError(err) - return + return nil } - err = bot.enqueueBlock(blockbytes) + err = bot.enqueueBlock(ctx, blockbytes) if err != nil { - bot.setError(err) - bot.log.WithError(err).Errorf("error enqueuing follow block %d", bot.nextRound) - break + return fmt.Errorf("followLoop() err: %w", err) } // Clear out any transient error which may have occurred. bot.setError(nil) @@ -187,21 +173,15 @@ func (bot *fetcherImpl) followLoop() { } } -// Run is part of the Fetcher interface -func (bot *fetcherImpl) Run() { - // In theory a buffer of size one should be enough, but let's make it bigger. - bot.blockQueue = make(chan *rpcs.EncodedBlockCert, 5) - defer close(bot.blockQueue) - go bot.processQueue() - +func (bot *fetcherImpl) mainLoop(ctx context.Context) error { for { - if bot.isDone() { - return + err := bot.catchupLoop(ctx) + if err != nil { + return fmt.Errorf("mainLoop() err: %w", err) } - bot.catchupLoop() - bot.followLoop() - if bot.isDone() { - return + err = bot.followLoop(ctx) + if err != nil { + return fmt.Errorf("mainLoop() err: %w", err) } if bot.failingSince.IsZero() { @@ -212,7 +192,7 @@ func (bot *fetcherImpl) Run() { bot.log.Warnf("failing to fetch from algod for %s, (since %s, now %s)", dt.String(), bot.failingSince.String(), now.String()) } time.Sleep(5 * time.Second) - err := bot.reclient() + err = bot.reclient() if err != nil { bot.setError(err) bot.log.WithError(err).Errorln("err trying to re-client") @@ -222,9 +202,32 @@ func (bot *fetcherImpl) Run() { } } -// SetContext is part of the Fetcher interface -func (bot *fetcherImpl) SetContext(ctx context.Context) { - bot.ctx = ctx +// Run is part of the Fetcher interface +func (bot *fetcherImpl) Run(ctx context.Context) error { + bot.blockQueue = make(chan *rpcs.EncodedBlockCert) + + ctx, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + + ch0 := make(chan error, 1) + go func() { + ch0 <- bot.processQueue(ctx) + }() + + ch1 := make(chan error, 1) + go func() { + ch1 <- bot.mainLoop(ctx) + }() + + select { + case err := <-ch0: + cancelFunc() + return fmt.Errorf("Run() err: %w", err) + case err := <-ch1: + cancelFunc() + <-ch0 + return fmt.Errorf("Run() err: %w", err) + } } // SetNextRound is part of the Fetcher interface @@ -232,26 +235,9 @@ func (bot *fetcherImpl) SetNextRound(nextRound uint64) { bot.nextRound = nextRound } -func (bot *fetcherImpl) handleBlock(block *rpcs.EncodedBlockCert) { - for _, handler := range bot.blockHandlers { - handler.HandleBlock(block) - } -} - // AddBlockHandler is part of the Fetcher interface -func (bot *fetcherImpl) AddBlockHandler(handler BlockHandler) { - if bot.blockHandlers == nil { - x := make([]BlockHandler, 1, 10) - x[0] = handler - bot.blockHandlers = x - return - } - for _, oh := range bot.blockHandlers { - if oh == handler { - return - } - } - bot.blockHandlers = append(bot.blockHandlers, handler) +func (bot *fetcherImpl) SetBlockHandler(handler func(context.Context, *rpcs.EncodedBlockCert) error) { + bot.handler = handler } // ForDataDir initializes Fetcher to read data from the data directory. diff --git a/idb/dummy/dummy.go b/idb/dummy/dummy.go index eed62c793..ceae2c406 100644 --- a/idb/dummy/dummy.go +++ b/idb/dummy/dummy.go @@ -20,6 +20,9 @@ func IndexerDb() idb.IndexerDb { return &dummyIndexerDb{} } +func (db *dummyIndexerDb) Close() { +} + func (db *dummyIndexerDb) AddBlock(block *bookkeeping.Block) error { db.log.Printf("AddBlock") return nil diff --git a/idb/idb.go b/idb/idb.go index cddc1d401..bbd865962 100644 --- a/idb/idb.go +++ b/idb/idb.go @@ -155,6 +155,10 @@ var ErrorBlockNotFound = errors.New("block not found") // TODO: sqlite3 impl // TODO: cockroachdb impl type IndexerDb interface { + // Close all connections to the database. Should be called when IndexerDb is + // no longer needed. + Close() + // Import a block and do the accounting. AddBlock(block *bookkeeping.Block) error diff --git a/idb/mocks/IndexerDb.go b/idb/mocks/IndexerDb.go index 650bec4f5..8cc26c02c 100644 --- a/idb/mocks/IndexerDb.go +++ b/idb/mocks/IndexerDb.go @@ -104,6 +104,11 @@ func (_m *IndexerDb) Assets(ctx context.Context, filter idb.AssetsQuery) (<-chan return r0, r1 } +// Close provides a mock function with given fields: +func (_m *IndexerDb) Close() { + _m.Called() +} + // GetAccounts provides a mock function with given fields: ctx, opts func (_m *IndexerDb) GetAccounts(ctx context.Context, opts idb.AccountQueryOptions) (<-chan idb.AccountRow, uint64) { ret := _m.Called(ctx, opts) diff --git a/idb/postgres/internal/testing/testing.go b/idb/postgres/internal/testing/testing.go index 8b5dc12a8..433225353 100644 --- a/idb/postgres/internal/testing/testing.go +++ b/idb/postgres/internal/testing/testing.go @@ -20,9 +20,6 @@ var testpg = flag.String( func SetupPostgres(t *testing.T) (*pgxpool.Pool, string, func()) { if testpg != nil && *testpg != "" { // use non-docker Postgresql - shutdownFunc := func() { - // nothing to do, psql db setup/teardown is external - } connStr := *testpg db, err := pgxpool.Connect(context.Background(), connStr) @@ -32,6 +29,10 @@ func SetupPostgres(t *testing.T) (*pgxpool.Pool, string, func()) { context.Background(), `DROP SCHEMA public CASCADE; CREATE SCHEMA public;`) require.NoError(t, err) + shutdownFunc := func() { + db.Close() + } + return db, connStr, shutdownFunc } @@ -43,11 +44,6 @@ func SetupPostgres(t *testing.T) (*pgxpool.Pool, string, func()) { container, err := gnomock.Start(p) require.NoError(t, err, "Error starting gnomock") - shutdownFunc := func() { - err = gnomock.Stop(container) - require.NoError(t, err, "Error stoping gnomock") - } - connStr := fmt.Sprintf( "host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", container.Host, container.DefaultPort(), @@ -57,5 +53,11 @@ func SetupPostgres(t *testing.T) (*pgxpool.Pool, string, func()) { db, err := pgxpool.Connect(context.Background(), connStr) require.NoError(t, err, "Error opening postgres connection") + shutdownFunc := func() { + db.Close() + err = gnomock.Stop(container) + require.NoError(t, err, "Error stoping gnomock") + } + return db, connStr, shutdownFunc } diff --git a/idb/postgres/postgres.go b/idb/postgres/postgres.go index a159d1ae1..8de47a7ea 100644 --- a/idb/postgres/postgres.go +++ b/idb/postgres/postgres.go @@ -107,6 +107,11 @@ type IndexerDb struct { accountingLock sync.Mutex } +// Close is part of idb.IndexerDb. +func (db *IndexerDb) Close() { + db.db.Close() +} + // txWithRetry is a helper function that retries the function `f` in case the database // transaction in it fails due to a serialization error. `f` is provided // a transaction created using `opts`. If `f` experiences a database error, this error diff --git a/idb/postgres/postgres_integration_common_test.go b/idb/postgres/postgres_integration_common_test.go index 776f31551..4b4efa3cd 100644 --- a/idb/postgres/postgres_integration_common_test.go +++ b/idb/postgres/postgres_integration_common_test.go @@ -27,7 +27,14 @@ func setupIdbWithConnectionString(t *testing.T, connStr string, genesis bookkeep func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeeping.Block) (*IndexerDb /*db*/, func() /*shutdownFunc*/) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) - return setupIdbWithConnectionString(t, connStr, genesis, genesisBlock), shutdownFunc + + db := setupIdbWithConnectionString(t, connStr, genesis, genesisBlock) + newShutdownFunc := func() { + db.Close() + shutdownFunc() + } + + return db, newShutdownFunc } // Helper to execute a query returning an integer, for example COUNT(*). Returns -1 on an error. diff --git a/idb/postgres/postgres_integration_test.go b/idb/postgres/postgres_integration_test.go index 00c5cd827..af115841d 100644 --- a/idb/postgres/postgres_integration_test.go +++ b/idb/postgres/postgres_integration_test.go @@ -34,6 +34,7 @@ func TestMaxRoundOnUninitializedDB(t *testing.T) { db, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) assert.NoError(t, err) + defer db.Close() round, err := db.GetNextRoundToAccount() assert.Equal(t, idb.ErrorNotInitialized, err) @@ -51,6 +52,8 @@ func TestMaxRound(t *testing.T) { pdb, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) assert.NoError(t, err) + defer pdb.Close() + db.Exec( context.Background(), `INSERT INTO metastate (k, v) values ($1, $2)`, @@ -72,6 +75,8 @@ func TestAccountedRoundNextRound0(t *testing.T) { pdb, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) assert.NoError(t, err) + defer pdb.Close() + db.Exec( context.Background(), `INSERT INTO metastate (k, v) values ($1, $2)`, @@ -946,6 +951,7 @@ func TestInitializationNewDatabase(t *testing.T) { db, availableCh, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) + defer db.Close() _, ok := <-availableCh assert.False(t, ok) @@ -961,11 +967,12 @@ func TestOpenDbAgain(t *testing.T) { _, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() - _, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) - require.NoError(t, err) - - _, _, err = OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) - require.NoError(t, err) + for i := 0; i < 2; i++ { + db, availableCh, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) + require.NoError(t, err) + <-availableCh + db.Close() + } } func requireNilOrEqual(t *testing.T, expected string, actual *string) { @@ -1298,6 +1305,7 @@ func TestAddBlockIncrementsMaxRoundAccounted(t *testing.T) { defer shutdownFunc() db, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) + defer db.Close() err = db.LoadGenesis(test.MakeGenesis()) require.NoError(t, err) @@ -1531,7 +1539,9 @@ func TestSearchForInnerTransactionReturnsRootTransaction(t *testing.T) { // Given: A DB with one transaction containing inner transactions [app -> pay -> xfer] pdb, connStr, shutdownFunc := pgtest.SetupPostgres(t) defer shutdownFunc() - db := setupIdbWithConnectionString(t, connStr, test.MakeGenesis(), test.MakeGenesisBlock()) + db := setupIdbWithConnectionString( + t, connStr, test.MakeGenesis(), test.MakeGenesisBlock()) + defer db.Close() appCall := test.MakeAppCallWithInnerTxn(test.AccountA, appAddr, test.AccountB, appAddr, test.AccountC) @@ -1644,6 +1654,7 @@ func TestLoadGenesisAccountTotals(t *testing.T) { defer shutdownFunc() db, _, err := OpenPostgres(connStr, idb.IndexerDbOptions{}, nil) require.NoError(t, err) + defer db.Close() err = db.LoadGenesis(test.MakeGenesis()) require.NoError(t, err)