Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry loop to block handler. #823

Merged
merged 7 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions cmd/algorand-indexer/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@ var daemonCmd = &cobra.Command{
bot.SetNextRound(nextRound)

imp := importer.NewImporter(db)
handler := func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
return handleBlock(block, &imp)
}
handler := blockHandler(imp, 1*time.Second)
bot.SetBlockHandler(handler)

logger.Info("Starting block importer.")
Expand Down Expand Up @@ -174,7 +172,29 @@ func makeOptions() (options api.ExtraOptions) {
return
}

func handleBlock(block *rpcs.EncodedBlockCert, imp *importer.Importer) error {
// blockHandler creates a handler complying to the fetcher block handler interface. In case of a failure it keeps
// attempting to add the block until the fetcher shuts down.
func blockHandler(imp importer.Importer, retryDelay time.Duration) func(context.Context, *rpcs.EncodedBlockCert) error {
return func(ctx context.Context, block *rpcs.EncodedBlockCert) error {
for {
err := handleBlock(block, imp)
if err == nil {
// return on success.
return nil
}

// Delay or terminate before next attempt.
select {
case <-ctx.Done():
return err
case <-time.After(retryDelay):
break
}
}
}
}

func handleBlock(block *rpcs.EncodedBlockCert, imp importer.Importer) error {
start := time.Now()
err := imp.ImportBlock(block)
if err != nil {
Expand Down
64 changes: 64 additions & 0 deletions cmd/algorand-indexer/daemon_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/rpcs"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

type mockImporter struct {
}

var errMockImportBlock = errors.New("mock import block error")

func (imp *mockImporter) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {
return errMockImportBlock
}

func TestImportRetryAndCancel(t *testing.T) {
// connect debug logger
nullLogger, hook := test.NewNullLogger()
logger = nullLogger

// cancellable context
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

// create handler with mock importer and start, it should generate an error every second until cancelled.
imp := &mockImporter{}
handler := blockHandler(imp, 50*time.Millisecond)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
block := rpcs.EncodedBlockCert{
Block: bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: 1234,
},
},
}
handler(cctx, &block)
}()

// accumulate some errors
for len(hook.Entries) < 5 {
time.Sleep(25 * time.Millisecond)
}

for _, entry := range hook.Entries {
assert.Equal(t, entry.Message, "adding block 1234 to database failed")
assert.Equal(t, entry.Data["error"], errMockImportBlock)
}

// Wait for handler to exit.
cancel()
wg.Wait()
}
8 changes: 4 additions & 4 deletions importer/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func (h *ImportHelper) Import(db idb.IndexerDb, args []string) {
pathsSorted = pathsSorted[:h.BlockFileLimit]
}
for _, gfname := range pathsSorted {
fb, ft := importFile(gfname, &imp, h.Log)
fb, ft := importFile(gfname, imp, h.Log)
blocks += fb
txCount += ft
}
} else {
// try without passing throug glob
fb, ft := importFile(fname, &imp, h.Log)
fb, ft := importFile(fname, imp, h.Log)
blocks += fb
txCount += ft
}
Expand All @@ -92,7 +92,7 @@ func maybeFail(err error, l *log.Logger, errfmt string, params ...interface{}) {
os.Exit(1)
}

func importTar(imp *Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCount int, err error) {
func importTar(imp Importer, tarfile io.Reader, l *log.Logger) (blockCount, txCount int, err error) {
tf := tar.NewReader(tarfile)
var header *tar.Header
header, err = tf.Next()
Expand Down Expand Up @@ -138,7 +138,7 @@ func importTar(imp *Importer, tarfile io.Reader, l *log.Logger) (blockCount, txC
return
}

func importFile(fname string, imp *Importer, l *log.Logger) (blocks, txCount int) {
func importFile(fname string, imp Importer, l *log.Logger) (blocks, txCount int) {
blocks = 0
txCount = 0
l.Infof("importing %s ...", fname)
Expand Down
10 changes: 7 additions & 3 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ import (
)

// Importer is used to import blocks into an idb.IndexerDb object.
type Importer struct {
type Importer interface {
ImportBlock(blockContainer *rpcs.EncodedBlockCert) error
}

type importerImpl struct {
db idb.IndexerDb
}

// ImportBlock processes a block and adds it to the IndexerDb
func (imp *Importer) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {
func (imp *importerImpl) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {
block := &blockContainer.Block

_, ok := config.Consensus[block.CurrentProtocol]
Expand All @@ -27,5 +31,5 @@ func (imp *Importer) ImportBlock(blockContainer *rpcs.EncodedBlockCert) error {

// NewImporter creates a new importer object.
func NewImporter(db idb.IndexerDb) Importer {
return Importer{db: db}
return &importerImpl{db: db}
}