Skip to content

Commit

Permalink
Stop block import gracefully.
Browse files Browse the repository at this point in the history
  • Loading branch information
tolikzinovyev committed Nov 30, 2021
1 parent dbfc5c5 commit 8569dd2
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 117 deletions.
33 changes: 22 additions & 11 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,21 @@ var daemonCmd = &cobra.Command{
maybeFail(err, "failed to get next round, %v", err)
bot.SetNextRound(nextRound)

bih := blockImporterHandler{imp: importer.NewImporter(db)}
bot.AddBlockHandler(&bih)
bot.SetContext(ctx)
imp := importer.NewImporter(db)
handler := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
return handleBlock(block, &imp)
}
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
bot.Run()
err = bot.Run(ctx)
if err != nil {
// If context is not expired.
if ctx.Err() == nil {
logger.WithError(err).Errorf("fetcher exited with error")
os.Exit(1)
}
}
}()
} else {
logger.Info("No block importer configured.")
Expand Down Expand Up @@ -165,14 +174,14 @@ func makeOptions() (options api.ExtraOptions) {
return
}

type blockImporterHandler struct {
imp importer.Importer
}

func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
func handleBlock(block *rpcs.EncodedBlockCert, imp *importer.Importer) error {
start := time.Now()
err := bih.imp.ImportBlock(block)
maybeFail(err, "adding block %d to database failed", block.Block.Round())
err := imp.ImportBlock(block)
if err != nil {
logger.WithError(err).Errorf(
"adding block %d to database failed", block.Block.Round())
return fmt.Errorf("handleBlock() err: %w", err)
}
dt := time.Since(start)

// Ignore round 0 (which is empty).
Expand All @@ -183,4 +192,6 @@ func (bih *blockImporterHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
}

logger.Infof("round r=%d (%d txn) imported in %s", block.Block.Round(), len(block.Block.Payset), dt.String())

return nil
}
21 changes: 6 additions & 15 deletions cmd/import-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ import (
"github.com/algorand/indexer/util"
)

type blockHandler struct {
f func(*rpcs.EncodedBlockCert) error
}

func (h blockHandler) HandleBlock(block *rpcs.EncodedBlockCert) {
err := h.f(block)
if err != nil {
fmt.Printf("error handling block %d err: %v\n", block.Block.Round(), err)
os.Exit(1)
}
}

func getGenesisBlock(client *algod.Client) (bookkeeping.Block, error) {
data, err := client.BlockRaw(0).Do(context.Background())
if err != nil {
Expand Down Expand Up @@ -278,7 +266,7 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg
nextRoundIndexer, nextRoundLedger)
}

blockHandlerFunc := func(block *rpcs.EncodedBlockCert) error {
blockHandlerFunc := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
var modifiedAccounts []basics.Address
var err0 error
var err1 error
Expand Down Expand Up @@ -321,9 +309,12 @@ func catchup(db *postgres.IndexerDb, l *ledger.Ledger, bot fetcher.Fetcher, logg

return checkModifiedAccounts(db, l, &block.Block, modifiedAccounts)
}
bot.AddBlockHandler(blockHandler{f: blockHandlerFunc})
bot.SetBlockHandler(blockHandlerFunc)
bot.SetNextRound(nextRoundLedger)
bot.Run()
err = bot.Run(context.Background())
if err != nil {
return fmt.Errorf("catchup err: %w", err)
}

return nil
}
Expand Down
166 changes: 75 additions & 91 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,24 @@ type Fetcher interface {
Algod() *algod.Client

// go bot.Run()
Run()
Run(ctx context.Context) error

AddBlockHandler(handler BlockHandler)
SetContext(ctx context.Context)
SetBlockHandler(f func(context.Context, *rpcs.EncodedBlockCert) error)
SetNextRound(nextRound uint64)

// Error returns any error fetcher is currently experiencing.
Error() string
}

// BlockHandler is the handler fetcher uses to process a block.
type BlockHandler interface {
HandleBlock(block *rpcs.EncodedBlockCert)
}

type fetcherImpl struct {
algorandData string
aclient *algod.Client
algodLastmod time.Time // newest mod time of algod.net algod.token

blockHandlers []BlockHandler
handler func(context.Context, *rpcs.EncodedBlockCert) error

nextRound uint64

ctx context.Context
done bool

failingSince time.Time

log *log.Logger
Expand Down Expand Up @@ -76,71 +67,60 @@ func (bot *fetcherImpl) Algod() *algod.Client {
return bot.aclient
}

func (bot *fetcherImpl) isDone() bool {
if bot.done {
return true
}
if bot.ctx == nil {
return false
}
select {
case <-bot.ctx.Done():
bot.done = true
return true
default:
return false
}
}

func (bot *fetcherImpl) setError(err error) {
bot.errmu.Lock()
bot.err = err
bot.errmu.Unlock()
}

func (bot *fetcherImpl) processQueue() {
func (bot *fetcherImpl) processQueue(ctx context.Context) error {
for {
block, ok := <-bot.blockQueue
if !ok {
return
select {
case block, ok := <-bot.blockQueue:
if !ok {
return nil
}
err := bot.handler(ctx, block)
if err != nil {
return fmt.Errorf("processQueue() handler err: %w", err)
}
case <-ctx.Done():
return fmt.Errorf("processQueue: ctx.Err(): %w", ctx.Err())
}
bot.handleBlock(block)
}
}

func (bot *fetcherImpl) enqueueBlock(blockbytes []byte) error {
block := new(rpcs.EncodedBlockCert)
err := protocol.Decode(blockbytes, block)
if err != nil {
return nil
return fmt.Errorf("enqueueBlock() decode err: %w", err)
}

bot.blockQueue <- block
return nil
}

// fetch the next block by round number until we find one missing (because it doesn't exist yet)
func (bot *fetcherImpl) catchupLoop() {
func (bot *fetcherImpl) catchupLoop(ctx context.Context) error {
var err error
var blockbytes []byte
aclient := bot.Algod()
for {
if bot.isDone() {
return
}

blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(context.Background())
blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(ctx)
if err != nil {
// If context has expired.
if ctx.Err() != nil {
return fmt.Errorf("catchupLoop() fetch err: %w", err)
}
bot.setError(err)
bot.log.WithError(err).Errorf("catchup block %d", bot.nextRound)
return
return nil
}

err = bot.enqueueBlock(blockbytes)
if err != nil {
bot.setError(err)
bot.log.WithError(err).Errorf("error enqueuing catchup block %d", bot.nextRound)
return
return fmt.Errorf("catchupLoop() err: %w", err)
}
// If we successfully handle the block, clear out any transient error which may have occurred.
bot.setError(nil)
Expand All @@ -150,35 +130,37 @@ func (bot *fetcherImpl) catchupLoop() {
}

// wait for algod to notify of a new round, then fetch that block
func (bot *fetcherImpl) followLoop() {
func (bot *fetcherImpl) followLoop(ctx context.Context) error {
var err error
var blockbytes []byte
aclient := bot.Algod()
for {
for retries := 0; retries < 3; retries++ {
if bot.isDone() {
return
}
_, err = aclient.StatusAfterBlock(bot.nextRound).Do(context.Background())
_, err = aclient.StatusAfterBlock(bot.nextRound).Do(ctx)
if err != nil {
bot.log.WithError(err).Errorf("r=%d error getting status %d", retries, bot.nextRound)
// If context has expired.
if ctx.Err() != nil {
return fmt.Errorf("followLoop() status err: %w", err)
}
bot.log.WithError(err).Errorf(
"r=%d error getting status %d", retries, bot.nextRound)
continue
}
blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(context.Background())
blockbytes, err = aclient.BlockRaw(bot.nextRound).Do(ctx)
if err == nil {
break
} else if ctx.Err() != nil { // if context has expired
return fmt.Errorf("followLoop() fetch block err: %w", err)
}
bot.log.WithError(err).Errorf("r=%d err getting block %d", retries, bot.nextRound)
}
if err != nil {
bot.setError(err)
return
return nil
}
err = bot.enqueueBlock(blockbytes)
if err != nil {
bot.setError(err)
bot.log.WithError(err).Errorf("error enqueuing follow block %d", bot.nextRound)
break
return fmt.Errorf("followLoop() err: %w", err)
}
// Clear out any transient error which may have occurred.
bot.setError(nil)
Expand All @@ -187,21 +169,15 @@ func (bot *fetcherImpl) followLoop() {
}
}

// Run is part of the Fetcher interface
func (bot *fetcherImpl) Run() {
// In theory a buffer of size one should be enough, but let's make it bigger.
bot.blockQueue = make(chan *rpcs.EncodedBlockCert, 5)
defer close(bot.blockQueue)
go bot.processQueue()

func (bot *fetcherImpl) mainLoop(ctx context.Context) error {
for {
if bot.isDone() {
return
err := bot.catchupLoop(ctx)
if err != nil {
return fmt.Errorf("mainLoop() err: %w", err)
}
bot.catchupLoop()
bot.followLoop()
if bot.isDone() {
return
err = bot.followLoop(ctx)
if err != nil {
return fmt.Errorf("mainLoop() err: %w", err)
}

if bot.failingSince.IsZero() {
Expand All @@ -212,7 +188,7 @@ func (bot *fetcherImpl) Run() {
bot.log.Warnf("failing to fetch from algod for %s, (since %s, now %s)", dt.String(), bot.failingSince.String(), now.String())
}
time.Sleep(5 * time.Second)
err := bot.reclient()
err = bot.reclient()
if err != nil {
bot.setError(err)
bot.log.WithError(err).Errorln("err trying to re-client")
Expand All @@ -222,36 +198,44 @@ func (bot *fetcherImpl) Run() {
}
}

// SetContext is part of the Fetcher interface
func (bot *fetcherImpl) SetContext(ctx context.Context) {
bot.ctx = ctx
// Run is part of the Fetcher interface
func (bot *fetcherImpl) Run(ctx context.Context) error {
// In theory a buffer of size one should be enough, but let's make it bigger.
bot.blockQueue = make(chan *rpcs.EncodedBlockCert, 5)

ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()

ch0 := make(chan error)
go func() {
ch0 <- bot.processQueue(ctx)
}()

ch1 := make(chan error)
go func() {
ch1 <- bot.mainLoop(ctx)
}()

select {
case err := <-ch0:
cancelFunc()
<-ch1
return fmt.Errorf("Run() err: %w", err)
case err := <-ch1:
cancelFunc()
<-ch0
return fmt.Errorf("Run() err: %w", err)
}
}

// SetNextRound is part of the Fetcher interface
func (bot *fetcherImpl) SetNextRound(nextRound uint64) {
bot.nextRound = nextRound
}

func (bot *fetcherImpl) handleBlock(block *rpcs.EncodedBlockCert) {
for _, handler := range bot.blockHandlers {
handler.HandleBlock(block)
}
}

// AddBlockHandler is part of the Fetcher interface
func (bot *fetcherImpl) AddBlockHandler(handler BlockHandler) {
if bot.blockHandlers == nil {
x := make([]BlockHandler, 1, 10)
x[0] = handler
bot.blockHandlers = x
return
}
for _, oh := range bot.blockHandlers {
if oh == handler {
return
}
}
bot.blockHandlers = append(bot.blockHandlers, handler)
func (bot *fetcherImpl) SetBlockHandler(handler func(context.Context, *rpcs.EncodedBlockCert) error) {
bot.handler = handler
}

// ForDataDir initializes Fetcher to read data from the data directory.
Expand Down

0 comments on commit 8569dd2

Please sign in to comment.