Skip to content

Commit

Permalink
Add retry loop to block handler. (#823)
Browse files Browse the repository at this point in the history
  • Loading branch information
winder authored Jan 20, 2022
1 parent 1313f26 commit 28b5e17
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
28 changes: 24 additions & 4 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ var daemonCmd = &cobra.Command{
bot.SetNextRound(nextRound)

imp := importer.NewImporter(db)
handler := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
return handleBlock(block, &imp)
}
handler := blockHandler(imp, 1*time.Second)
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
Expand Down Expand Up @@ -174,7 +172,29 @@ func makeOptions() (options api.ExtraOptions) {
return
}

func handleBlock(block *rpcs.EncodedBlockCert, imp *importer.Importer) error {
// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
return func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
for {
err := handleBlock(block, imp)
if err == nil {
// return on success.
return nil
}

// Delay or terminate before next attempt.
select {
case <-ctx.Done():
return err
case <-time.After(retryDelay):
break
}
}
}
}

func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error {
start := time.Now()
err := imp.ImportBlock(block)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions cmd/algorand-indexer/daemon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/rpcs"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

type mockImporter struct {
}

var errMockImportBlock = errors.New("mock import block error")

func (imp *mockImporter) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {
return errMockImportBlock
}

func TestImportRetryAndCancel(t *testing.T) {
// connect debug logger
nullLogger, hook := test.NewNullLogger()
logger = nullLogger

// cancellable context
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

// create handler with mock importer and start, it should generate errors until cancelled.
imp := &mockImporter{}
handler := blockHandler(imp, 50*time.Millisecond)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
block := rpcs.EncodedBlockCert{
Block: bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: 1234,
},
},
}
handler(cctx, &block)
}()

// accumulate some errors
for len(hook.Entries) < 5 {
time.Sleep(25 * time.Millisecond)
}

for _, entry := range hook.Entries {
assert.Equal(t, entry.Message, "adding block 1234 to database failed")
assert.Equal(t, entry.Data["error"], errMockImportBlock)
}

// Wait for handler to exit.
cancel()
wg.Wait()
}
8 changes: 4 additions & 4 deletions importer/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func (h *ImportHelper) Import(db idb.IndexerDb, args []string) {
pathsSorted = pathsSorted[:h.BlockFileLimit]
}
for _, gfname := range pathsSorted {
fb, ft := importFile(gfname, &imp, h.Log)
fb, ft := importFile(gfname, imp, h.Log)
blocks += fb
txCount += ft
}
} else {
// try without passing throug glob
fb, ft := importFile(fname, &imp, h.Log)
fb, ft := importFile(fname, imp, h.Log)
blocks += fb
txCount += ft
}
Expand All @@ -92,7 +92,7 @@ func maybeFail(err error, l *log.Logger, errfmt string, params ...interface{}) {
os.Exit(1)
}

func importTar(imp *Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCount int, err error) {
func importTar(imp Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCount int, err error) {
tf := tar.NewReader(tarfile)
var header *tar.Header
header, err = tf.Next()
Expand Down Expand Up @@ -138,7 +138,7 @@ func importTar(imp *Importer, tarfile io.Reader, l *log.Logger) (blockCount, txC
return
}

func importFile(fname string, imp *Importer, l *log.Logger) (blocks, txCount int) {
func importFile(fname string, imp Importer, l *log.Logger) (blocks, txCount int) {
blocks = 0
txCount = 0
l.Infof("importing %s ...", fname)
Expand Down
10 changes: 7 additions & 3 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (
)

// Importer is used to import blocks into an idb.IndexerDb object.
type Importer struct {
type Importer interface {
ImportBlock(blockContainer *rpcs.EncodedBlockCert) error
}

type importerImpl struct {
db idb.IndexerDb
}

// ImportBlock processes a block and adds it to the IndexerDb
func (imp *Importer) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {
func (imp *importerImpl) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {
block := &blockContainer.Block

_, ok := config.Consensus[block.CurrentProtocol]
Expand All @@ -27,5 +31,5 @@ func (imp *Importer) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {

// NewImporter creates a new importer object.
func NewImporter(db idb.IndexerDb) Importer {
return Importer{db: db}
return &importerImpl{db: db}
}

0 comments on commit 28b5e17

Please sign in to comment.