Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add waits/retries to algod importer #1178

Merged
merged 3 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ codecov:
require_ci_to_pass: no
branch: develop

ignore:
- "idb/mocks"
- "idb/dummy"
- "util/test"

coverage:
precision: 2
round: down
Expand Down
86 changes: 30 additions & 56 deletions fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,21 @@ package fetcher
import (
"context"
"fmt"
"github.com/algorand/indexer/util/test"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand-sdk/client/v2/algod"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
)

type AlgodHandler struct {
mock.Mock
}

func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler.Called(w, req)
return
}

type BlockHandler struct {
mock.Mock
}
Expand All @@ -36,86 +27,69 @@ func (handler *BlockHandler) handlerFunc(ctx context.Context, cert *rpcs.Encoded
return args.Error(0)
}

func mockAClient(t *testing.T, algodHandler *AlgodHandler) *algod.Client {
mockServer := httptest.NewServer(algodHandler)
aclient, err := algod.MakeClient(mockServer.URL, "")
if err != nil {
t.FailNow()
}
return aclient
}

func TestFetcherImplErrorInitialization(t *testing.T) {
aclient := mockAClient(t, &AlgodHandler{})
aclient, err := test.MockAClient(test.NewAlgodHandler())
assert.NoError(t, err)
fetcher := &fetcherImpl{aclient: aclient, log: logrus.New()}
require.Equal(t, "", fetcher.Error(), "Initialization of fetcher caused an unexpected error.")
}

func TestFetcherImplAlgodReturnsClient(t *testing.T) {
aclient := mockAClient(t, &AlgodHandler{})
aclient, err := test.MockAClient(test.NewAlgodHandler())
assert.NoError(t, err)
fetcher := &fetcherImpl{aclient: aclient, log: logrus.New()}
require.Equal(t, aclient, fetcher.Algod(), "Algod client returned from fetcherImpl does not match expected instance.")
}

func TestFetcherImplSetError(t *testing.T) {
aclient := mockAClient(t, &AlgodHandler{})
aclient, err := test.MockAClient(test.NewAlgodHandler())
assert.NoError(t, err)
fetcher := &fetcherImpl{aclient: aclient, log: logrus.New()}
expectedErr := fmt.Errorf("foobar")
fetcher.setError(expectedErr)
require.Equal(t, expectedErr.Error(), fetcher.Error(), "Error produced by setError was not reflected in Error output.")
}

func TestFetcherImplProcessQueueHandlerError(t *testing.T) {
mockAlgodHandler := &AlgodHandler{}
aclient := mockAClient(t, mockAlgodHandler)
aclient, err := test.MockAClient(test.NewAlgodHandler(test.BlockResponder))
assert.NoError(t, err)
fetcher := &fetcherImpl{aclient: aclient, log: logrus.New()}
bHandler := &BlockHandler{}
expectedError := fmt.Errorf("handlerError")
// The block handler function will immediately return an error on any block passed to it
bHandler.On("handlerFunc", mock.Anything, mock.Anything).Return(expectedError)
fetcher.SetBlockHandler(bHandler.handlerFunc)
// Mock algod server to continually return empty blocks
mockAlgodHandler.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
respWriter := args.Get(0).(http.ResponseWriter)
req := args.Get(1).(*http.Request)
path := req.URL.Path
if strings.Contains(path, "v2/blocks/") {
var block bookkeeping.Block
respWriter.Write(protocol.Encode(&block))
}
})
require.ErrorIsf(t, fetcher.Run(context.Background()), expectedError, "FetcherImpl did not return expected error in processQueue handler.")
}

func TestFetcherImplCatchupLoopBlockError(t *testing.T) {
mockAlgodHandler := &AlgodHandler{}
aclient := mockAClient(t, mockAlgodHandler)
passingCalls := 5
aclient, err := test.MockAClient(test.NewAlgodHandler(
// Our mock algod client will process /v2/blocks/{round} calls
// returning an empty block `passingCalls` times before throwing 500s
func(path string, w http.ResponseWriter) bool {
if strings.Contains(path, "v2/blocks/") {
if passingCalls == 0 {
w.WriteHeader(http.StatusInternalServerError)
} else {
var block bookkeeping.Block
w.WriteHeader(http.StatusOK)
w.Write(protocol.Encode(&block))
passingCalls--
}
return true
}
return false
}),
)
assert.NoError(t, err)
// Initializing blockQueue here needs buffer since we have no other goroutines receiving from it
fetcher := &fetcherImpl{aclient: aclient, log: logrus.New(), blockQueue: make(chan *rpcs.EncodedBlockCert, 256)}
bHandler := &BlockHandler{}
// the handler will do nothing here
bHandler.On("handlerFunc", mock.Anything, mock.Anything).Return(nil)
fetcher.SetBlockHandler(bHandler.handlerFunc)

// Our mock algod client will process /v2/blocks/{round} calls
// returning an empty block `passingCalls` times before throwing 500s
mockAlgodHandler.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
respWriter := args.Get(0).(http.ResponseWriter)
req := args.Get(1).(*http.Request)
path := req.URL.Path
if strings.Contains(path, "v2/blocks/") {
if passingCalls <= 0 {
respWriter.WriteHeader(http.StatusInternalServerError)
} else {
var block bookkeeping.Block
respWriter.WriteHeader(http.StatusOK)
respWriter.Write(protocol.Encode(&block))
passingCalls--
}
}
})
err := fetcher.catchupLoop(context.Background())
err = fetcher.catchupLoop(context.Background())
require.NoError(t, err, "FetcherImpl returned an unexpected error from catchupLoop")
require.Equal(t, "", fetcher.Error(), "FetcherImpl set an unexpected error from algod client during catchupLoop")
}
38 changes: 27 additions & 11 deletions importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,35 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var err error
var blk data.BlockData
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
if err != nil {
return blk, err
}
tmpBlk := new(rpcs.EncodedBlockCert)
err = protocol.Decode(blockbytes, tmpBlk)

blk = data.BlockData{
BlockHeader: tmpBlk.Block.BlockHeader,
Payset: tmpBlk.Block.Payset,
Certificate: &tmpBlk.Certificate,
for retries := 0; retries < 3; retries++ {
// nextRound - 1 because the endpoint waits until the subsequent block is committed to return
_, err = algodImp.aclient.StatusAfterBlock(rnd - 1).Do(algodImp.ctx)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
algodImp.logger.Errorf(
"r=%d error getting status %d", retries, rnd)
continue
}
blockbytes, err = algodImp.aclient.BlockRaw(rnd).Do(algodImp.ctx)
if err != nil {
return blk, err
}
tmpBlk := new(rpcs.EncodedBlockCert)
err = protocol.Decode(blockbytes, tmpBlk)

blk = data.BlockData{
BlockHeader: tmpBlk.Block.BlockHeader,
Payset: tmpBlk.Block.Payset,
Certificate: &tmpBlk.Certificate,
}
return blk, err
}
return blk, err
algodImp.logger.Error("GetBlock finished retries without fetching a block.")
return blk, fmt.Errorf("finished retries without fetching a block")
}

func (algodImp *algodImporter) unmarhshalConfig(cfg string) error {
Expand Down
102 changes: 46 additions & 56 deletions importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,14 @@ package algodimporter

import (
"context"
"net/http"
"net/http/httptest"
"path"
"strconv"
"strings"
"os"
"testing"

"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-codec/codec"
"github.com/algorand/indexer/importers"
"github.com/algorand/indexer/plugins"
"github.com/algorand/indexer/util/test"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"
)
Expand All @@ -30,49 +22,12 @@ var (
)

func init() {
logger, _ = test.NewNullLogger()
logger = logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
ctx, cancel = context.WithCancel(context.Background())
}

func MockAlgodServerReturnsJustGenesis() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

if strings.Contains(r.URL.Path, "/genesis") {
w.WriteHeader(http.StatusOK)
genesis := &bookkeeping.Genesis{}
blockbytes := protocol.EncodeJSON(*genesis)
w.Write(blockbytes)
} else {
w.WriteHeader(http.StatusBadRequest)
}
}))
}

func MockAlgodServerReturnsGenesisAndEmptyBlock() *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

if strings.Contains(r.URL.Path, "/genesis") {
w.WriteHeader(http.StatusOK)
genesis := &bookkeeping.Genesis{}
blockbytes := protocol.EncodeJSON(*genesis)
w.Write(blockbytes)
} else {
rnd, _ := strconv.Atoi(path.Base(r.URL.Path))
blk := rpcs.EncodedBlockCert{Block: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: basics.Round(rnd)}}}
var blockbytes []byte
w.WriteHeader(http.StatusOK)
response := struct {
Block bookkeeping.Block `codec:"block"`
}{
Block: blk.Block,
}
enc := codec.NewEncoderBytes(&blockbytes, protocol.CodecHandle)
enc.Encode(response)
w.Write(blockbytes)
}
}))
}

func TestImporterorterMetadata(t *testing.T) {
testImporter = New()
metadata := testImporter.Metadata()
Expand All @@ -83,7 +38,7 @@ func TestImporterorterMetadata(t *testing.T) {
}

func TestCloseSuccess(t *testing.T) {
ts := MockAlgodServerReturnsJustGenesis()
ts := test.NewAlgodServer(test.GenesisResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.PluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
Expand All @@ -92,7 +47,7 @@ func TestCloseSuccess(t *testing.T) {
}

func TestInitSuccess(t *testing.T) {
ts := MockAlgodServerReturnsJustGenesis()
ts := test.NewAlgodServer(test.GenesisResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.PluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
Expand All @@ -117,8 +72,8 @@ func TestConfigDefault(t *testing.T) {
assert.Equal(t, plugins.PluginConfig(expected), testImporter.Config())
}

func TestGetBlockFailure(t *testing.T) {
ts := MockAlgodServerReturnsJustGenesis()
func TestWaitForBlockBlockFailure(t *testing.T) {
ts := test.NewAlgodServer(test.GenesisResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.PluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
Expand All @@ -130,7 +85,11 @@ func TestGetBlockFailure(t *testing.T) {
}

func TestGetBlockSuccess(t *testing.T) {
ts := MockAlgodServerReturnsGenesisAndEmptyBlock()
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockResponder,
test.BlockAfterResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.PluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
Expand All @@ -142,3 +101,34 @@ func TestGetBlockSuccess(t *testing.T) {
assert.True(t, downloadedBlk.Empty())
cancel()
}

func TestGetBlockContextCancelled(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockResponder,
test.BlockAfterResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.PluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

cancel()
_, err = testImporter.GetBlock(uint64(10))
assert.Error(t, err)
}

func TestGetBlockFailure(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockAfterResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.PluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

_, err = testImporter.GetBlock(uint64(10))
assert.Error(t, err)
cancel()
}
Loading