Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IndexerDb.Close() #801

Merged
merged 7 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ func setupIdb(t *testing.T, genesis bookkeeping.Genesis, genesisBlock bookkeepin
db, _, err := postgres.OpenPostgres(connStr, idb.IndexerDbOptions{}, nil)
require.NoError(t, err)

newShutdownFunc := func() {
db.Close()
shutdownFunc()
}

err = db.LoadGenesis(genesis)
require.NoError(t, err)

err = db.AddBlock(&genesisBlock)
require.NoError(t, err)

return db, shutdownFunc
return db, newShutdownFunc
}

func TestApplicationHander(t *testing.T) {
Expand Down
41 changes: 29 additions & 12 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -83,8 +84,13 @@ var daemonCmd = &cobra.Command{
opts.ReadOnly = true
}
db, availableCh := indexerDbFromFlags(opts)
defer db.Close()
var wg sync.WaitGroup
if bot != nil {
wg.Add(1)
go func() {
defer wg.Done()

// Wait until the database is available.
<-availableCh

Expand All @@ -97,13 +103,21 @@ var daemonCmd = &cobra.Command{
maybeFail(err, "failed to get next round, %v", err)
bot.SetNextRound(nextRound)

bih := blockImporterHandler{imp: importer.NewImporter(db)}
bot.AddBlockHandler(&bih)
bot.SetContext(ctx)
imp := importer.NewImporter(db)
handler := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
return handleBlock(block, &imp)
}
bot.SetBlockHandler(handler)
Comment on lines +106 to +110
Copy link
Contributor

@winder winder Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I slightly prefer the interface object over the closure here (up to you though).

AddBlockHandler -> SetBlockHandler is a nice simplification

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? Interface objects are harder to construct at the user side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The terseness mainly, you could create/set it in one line if you wanted. Here you must create the importer object separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about terseness. With a closure, looking at the function signature, you immediately see what the closure is. Plus, an interface is a set of functions. That's redundant.


logger.Info("Starting block importer.")
bot.Run()
cf()
winder marked this conversation as resolved.
Show resolved Hide resolved
err = bot.Run(ctx)
if err != nil {
// If context is not expired.
if ctx.Err() == nil {
logger.WithError(err).Errorf("fetcher exited with error")
os.Exit(1)
}
}
}()
} else {
logger.Info("No block importer configured.")
Expand All @@ -112,6 +126,7 @@ var daemonCmd = &cobra.Command{
fmt.Printf("serving on %s\n", daemonServerAddr)
logger.Infof("serving on %s", daemonServerAddr)
api.Serve(ctx, daemonServerAddr, db, bot, logger, makeOptions())
wg.Wait()
},
}

Expand Down Expand Up @@ -159,14 +174,14 @@ func makeOptions() (options api.ExtraOptions) {
return
}

type blockImporterHandler struct {
imp importer.Importer
}

func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
func handleBlock(block *rpcs.EncodedBlockCert, imp *importer.Importer) error {
start := time.Now()
err := bih.imp.ImportBlock(block)
maybeFail(err, "adding block %d to database failed", block.Block.Round())
err := imp.ImportBlock(block)
if err != nil {
logger.WithError(err).Errorf(
"adding block %d to database failed", block.Block.Round())
return fmt.Errorf("handleBlock() err: %w", err)
}
dt := time.Since(start)

// Ignore round 0 (which is empty).
Expand All @@ -177,4 +192,6 @@ func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
}

logger.Infof("round r=%d (%d txn) imported in %s", block.Block.Round(), len(block.Block.Payset), dt.String())

return nil
}
1 change: 1 addition & 0 deletions cmd/algorand-indexer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var importCmd = &cobra.Command{
}

db, availableCh := indexerDbFromFlags(idb.IndexerDbOptions{})
defer db.Close()
<-availableCh

helper := importer.NewImportHelper(
Expand Down
21 changes: 6 additions & 15 deletions cmd/import-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ import (
"github.com/algorand/indexer/util"
)

type blockHandler struct {
f func(*rpcs.EncodedBlockCert) error
}

func (h blockHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
err := h.f(block)
if err != nil {
fmt.Printf("error handling block %d err: %v\n", block.Block.Round(), err)
os.Exit(1)
}
}

func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) {
data, err := client.BlockRaw(0).Do(context.Background())
if err != nil {
Expand Down Expand Up @@ -278,7 +266,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg
nextRoundIndexer, nextRoundLedger)
}

blockHandlerFunc := func(block *rpcs.EncodedBlockCert) error {
blockHandlerFunc := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
var modifiedAccounts []basics.Address
var err0 error
var err1 error
Expand Down Expand Up @@ -321,9 +309,12 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg

return checkModifiedAccounts(db, l, &block.Block, modifiedAccounts)
}
bot.AddBlockHandler(blockHandler{f: blockHandlerFunc})
bot.SetBlockHandler(blockHandlerFunc)
bot.SetNextRound(nextRoundLedger)
bot.Run()
err = bot.Run(context.Background())
if err != nil {
return fmt.Errorf("catchup err: %w", err)
}

return nil
}
Expand Down
Loading