Skip to content

Commit

Permalink
Add IndexerDb.Close() (#801)
Browse files Browse the repository at this point in the history
* Add IndexerDb.Close().

* fix a bug

* Stop block import gracefully.

* Improve context usage in fetcher.

* PR feedback.

* more feedback

* Add buffers for error channels so goroutines can exit.
  • Loading branch information
tolikzinovyev authored Dec 8, 2021
1 parent f4ff381 commit 7655717
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 139 deletions.
7 changes: 6 additions & 1 deletion api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeepin
db, _, err := postgres.OpenPostgres(connStr, idb.IndexerDbOptions{}, nil)
require.NoError(t, err)

newShutdownFunc := func() {
db.Close()
shutdownFunc()
}

err = db.LoadGenesis(genesis)
require.NoError(t, err)

err = db.AddBlock(&genesisBlock)
require.NoError(t, err)

return db, shutdownFunc
return db, newShutdownFunc
}

func TestApplicationHander(t *testing.T) {
Expand Down
41 changes: 29 additions & 12 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -83,8 +84,13 @@ var daemonCmd = &cobra.Command{
opts.ReadOnly = true
}
db, availableCh := indexerDbFromFlags(opts)
defer db.Close()
var wg sync.WaitGroup
if bot != nil {
wg.Add(1)
go func() {
defer wg.Done()

// Wait until the database is available.
<-availableCh

Expand All @@ -97,13 +103,21 @@ var daemonCmd = &cobra.Command{
maybeFail(err, "failed to get next round, %v", err)
bot.SetNextRound(nextRound)

bih := blockImporterHandler{imp: importer.NewImporter(db)}
bot.AddBlockHandler(&bih)
bot.SetContext(ctx)
imp := importer.NewImporter(db)
handler := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
return handleBlock(block, &imp)
}
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
bot.Run()
cf()
err = bot.Run(ctx)
if err != nil {
// If context is not expired.
if ctx.Err() == nil {
logger.WithError(err).Errorf("fetcher exited with error")
os.Exit(1)
}
}
}()
} else {
logger.Info("No block importer configured.")
Expand All @@ -112,6 +126,7 @@ var daemonCmd = &cobra.Command{
fmt.Printf("serving on %s\n", daemonServerAddr)
logger.Infof("serving on %s", daemonServerAddr)
api.Serve(ctx, daemonServerAddr, db, bot, logger, makeOptions())
wg.Wait()
},
}

Expand Down Expand Up @@ -159,14 +174,14 @@ func makeOptions() (options api.ExtraOptions) {
return
}

type blockImporterHandler struct {
imp importer.Importer
}

func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
func handleBlock(block *rpcs.EncodedBlockCert, imp *importer.Importer) error {
start := time.Now()
err := bih.imp.ImportBlock(block)
maybeFail(err, "adding block %d to database failed", block.Block.Round())
err := imp.ImportBlock(block)
if err != nil {
logger.WithError(err).Errorf(
"adding block %d to database failed", block.Block.Round())
return fmt.Errorf("handleBlock() err: %w", err)
}
dt := time.Since(start)

// Ignore round 0 (which is empty).
Expand All @@ -177,4 +192,6 @@ func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
}

logger.Infof("round r=%d (%d txn) imported in %s", block.Block.Round(), len(block.Block.Payset), dt.String())

return nil
}
1 change: 1 addition & 0 deletions cmd/algorand-indexer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var importCmd = &cobra.Command{
}

db, availableCh := indexerDbFromFlags(idb.IndexerDbOptions{})
defer db.Close()
<-availableCh

helper := importer.NewImportHelper(
Expand Down
21 changes: 6 additions & 15 deletions cmd/import-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ import (
"github.com/algorand/indexer/util"
)

type blockHandler struct {
f func(*rpcs.EncodedBlockCert) error
}

func (h blockHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
err := h.f(block)
if err != nil {
fmt.Printf("error handling block %d err: %v\n", block.Block.Round(), err)
os.Exit(1)
}
}

func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) {
data, err := client.BlockRaw(0).Do(context.Background())
if err != nil {
Expand Down Expand Up @@ -278,7 +266,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg
nextRoundIndexer, nextRoundLedger)
}

blockHandlerFunc := func(block *rpcs.EncodedBlockCert) error {
blockHandlerFunc := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
var modifiedAccounts []basics.Address
var err0 error
var err1 error
Expand Down Expand Up @@ -321,9 +309,12 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg

return checkModifiedAccounts(db, l, &block.Block, modifiedAccounts)
}
bot.AddBlockHandler(blockHandler{f: blockHandlerFunc})
bot.SetBlockHandler(blockHandlerFunc)
bot.SetNextRound(nextRoundLedger)
bot.Run()
err = bot.Run(context.Background())
if err != nil {
return fmt.Errorf("catchup err: %w", err)
}

return nil
}
Expand Down
Loading

0 comments on commit 7655717

Please sign in to comment.