Skip to content

New Feature: Adds the sync mode importer #1415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
6 changes: 3 additions & 3 deletions api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import (
"testing"
"time"

"github.com/algorand/avm-abi/apps"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/avm-abi/apps"
"github.com/algorand/go-algorand-sdk/v2/encoding/json"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres"
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/importers/algod/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
var (
GetAlgodRawBlockTimeSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: "indexer_daemon",
Subsystem: "algod_importer",
Name: GetAlgodRawBlockTimeName,
Help: "Total response time from Algod's raw block endpoint in seconds.",
})
Expand Down
172 changes: 172 additions & 0 deletions conduit/plugins/importers/algodfollower/algodfollower_importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package algodfollower

import (
"context"
_ "embed"
"fmt"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/indexer/conduit"
"github.com/algorand/indexer/conduit/plugins"
"github.com/algorand/indexer/conduit/plugins/importers"
"github.com/algorand/indexer/data"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/encoding/json"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
)

const importerName = "algod_follower"

type algodFollowerImporter struct {
aclient *algod.Client
logger *logrus.Logger
cfg Config
ctx context.Context
cancel context.CancelFunc
}

func (af *algodFollowerImporter) OnComplete(input data.BlockData) error {
_, err := af.aclient.SetSyncRound(input.Round() + 1).Do(af.ctx)
return err
}

//go:embed sample.yaml
var sampleConfig string

var algodFollowerImporterMetadata = conduit.Metadata{
Name: importerName,
Description: "Importer for fetching blocks from an algod REST API using sync round and ledger delta algod calls.",
Deprecated: false,
SampleConfig: sampleConfig,
}

// New initializes an algod importer
func New() importers.Importer {
return &algodFollowerImporter{}
}

func (af *algodFollowerImporter) Metadata() conduit.Metadata {
return algodFollowerImporterMetadata
}

// package-wide init function
func init() {
importers.Register(importerName, importers.ImporterConstructorFunc(func() importers.Importer {
return &algodFollowerImporter{}
}))
}

func (af *algodFollowerImporter) Init(ctx context.Context, cfg plugins.PluginConfig, logger *logrus.Logger) (*sdk.Genesis, error) {
af.ctx, af.cancel = context.WithCancel(ctx)
af.logger = logger
err := cfg.UnmarshalConfig(&af.cfg)
if err != nil {
return nil, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}
var client *algod.Client
u, err := url.Parse(af.cfg.NetAddr)
if err != nil {
return nil, err
}

if u.Scheme != "http" && u.Scheme != "https" {
af.cfg.NetAddr = "http://" + af.cfg.NetAddr
af.logger.Infof("Algod Importer added http prefix to NetAddr: %s", af.cfg.NetAddr)
}
client, err = algod.MakeClient(af.cfg.NetAddr, af.cfg.Token)
if err != nil {
return nil, err
}
af.aclient = client

genesisResponse, err := client.GetGenesis().Do(ctx)
if err != nil {
return nil, err
}

genesis := sdk.Genesis{}

err = json.Decode([]byte(genesisResponse), &genesis)
if err != nil {
return nil, err
}

return &genesis, err
}

func (af *algodFollowerImporter) Config() string {
s, _ := yaml.Marshal(af.cfg)
return string(s)
}

func (af *algodFollowerImporter) Close() error {
if af.cancel != nil {
af.cancel()
}
return nil
}

func (af *algodFollowerImporter) GetBlock(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var err error
var blk data.BlockData

for retries := 0; retries < 3; retries++ {
// nextRound - 1 because the endpoint waits until the subsequent block is committed to return
_, err = af.aclient.StatusAfterBlock(rnd - 1).Do(af.ctx)
if err != nil {
// If context has expired.
if af.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
af.logger.Errorf(
"r=%d error getting status %d", retries, rnd)
continue
}
start := time.Now()
blockbytes, err = af.aclient.BlockRaw(rnd).Do(af.ctx)
dt := time.Since(start)
GetAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
return blk, err
}
tmpBlk := new(rpcs.EncodedBlockCert)
err = protocol.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

// We aren't going to do anything with the new delta until we get everything
// else converted over
// Round 0 has no delta associated with it
if rnd != 0 {
_, err = af.aclient.GetLedgerStateDelta(rnd).Do(af.ctx)
if err != nil {
return blk, err
}
}

blk = data.BlockData{
BlockHeader: tmpBlk.Block.BlockHeader,
Payset: tmpBlk.Block.Payset,
Certificate: &tmpBlk.Certificate,
}
return blk, err
}
af.logger.Error("GetBlock finished retries without fetching a block. Check that the indexer is set to start at a round that the current algod node can handle")
return blk, fmt.Errorf("finished retries without fetching a block. Check that the indexer is set to start at a round that the current algod node can handle")
}

func (af *algodFollowerImporter) ProvideMetrics() []prometheus.Collector {
return []prometheus.Collector{
GetAlgodRawBlockTimeSeconds,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package algodfollower

//go:generate conduit-docs ../../../../conduit-docs/

//Name: conduit_importers_algod_follower

// Config specific to the sync mode importer
type Config struct {
// <code>netaddr</code> is the Algod network address. It must be either an <code>http</code> or <code>https</code> URL.
NetAddr string `yaml:"netaddr"`
// <code>token</code> is the Algod API endpoint token.
Token string `yaml:"token"`
}
159 changes: 159 additions & 0 deletions conduit/plugins/importers/algodfollower/algodfollower_importer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package algodfollower

import (
"context"
"os"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"gopkg.in/yaml.v3"

"github.com/algorand/indexer/conduit/plugins"
"github.com/algorand/indexer/conduit/plugins/importers"
"github.com/algorand/indexer/util/test"
)

var (
logger *logrus.Logger
ctx context.Context
cancel context.CancelFunc
testImporter importers.Importer
)

func init() {
logger = logrus.New()
logger.SetOutput(os.Stdout)
logger.SetLevel(logrus.InfoLevel)
ctx, cancel = context.WithCancel(context.Background())
}

// TestImporterMetadata tests that metadata is correctly set
func TestImporterMetadata(t *testing.T) {
testImporter = New()
metadata := testImporter.Metadata()
assert.Equal(t, metadata.Name, algodFollowerImporterMetadata.Name)
assert.Equal(t, metadata.Description, algodFollowerImporterMetadata.Description)
assert.Equal(t, metadata.Deprecated, algodFollowerImporterMetadata.Deprecated)
}

// TestCloseSuccess tests that closing results in no error
func TestCloseSuccess(t *testing.T) {
ts := test.NewAlgodServer(test.GenesisResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
err = testImporter.Close()
assert.NoError(t, err)
}

// TestInitSuccess tests that initializing results in no error
func TestInitSuccess(t *testing.T) {
ts := test.NewAlgodServer(test.GenesisResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)
testImporter.Close()
}

// TestInitUnmarshalFailure tests config marshaling failures
func TestInitUnmarshalFailure(t *testing.T) {
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("`"), logger)
assert.Error(t, err)
assert.ErrorContains(t, err, "connect failure in unmarshalConfig")
testImporter.Close()
}

// TestConfigDefault tests that configuration is correct by default
func TestConfigDefault(t *testing.T) {
testImporter = New()
expected, err := yaml.Marshal(&Config{})
if err != nil {
t.Fatalf("unable to Marshal default algodimporter.Config: %v", err)
}
assert.Equal(t, string(expected), testImporter.Config())
}

// TestWaitForBlockBlockFailure tests that GetBlock results in a failure
func TestWaitForBlockBlockFailure(t *testing.T) {
ts := test.NewAlgodServer(test.GenesisResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

blk, err := testImporter.GetBlock(uint64(10))
assert.Error(t, err)
assert.True(t, blk.Empty())
}

// TestGetBlockSuccess tests that GetBlock results in success
func TestGetBlockSuccess(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockResponder,
test.BlockAfterResponder, test.LedgerStateDeltaResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

downloadedBlk, err := testImporter.GetBlock(uint64(10))
assert.NoError(t, err)
assert.Equal(t, downloadedBlk.Round(), uint64(10))
assert.True(t, downloadedBlk.Empty())
cancel()
}

// TestGetBlockContextCancelled results in an error if the context is cancelled
func TestGetBlockContextCancelled(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockResponder,
test.BlockAfterResponder, test.LedgerStateDeltaResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

cancel()
_, err = testImporter.GetBlock(uint64(10))
assert.Error(t, err)
}

// TestGetBlockFailureBlockResponder tests that GetBlock results in an error due to a lack of block responsiveness
func TestGetBlockFailureBlockResponder(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockAfterResponder, test.LedgerStateDeltaResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

_, err = testImporter.GetBlock(uint64(10))
assert.Error(t, err)
cancel()
}

// TestGetBlockFailureLedgerStateDeltaResponder tests that GetBlock results in an error due to a lack of ledger state delta
func TestGetBlockFailureLedgerStateDeltaResponder(t *testing.T) {
ctx, cancel = context.WithCancel(context.Background())
ts := test.NewAlgodServer(
test.GenesisResponder,
test.BlockResponder,
test.BlockAfterResponder)
testImporter = New()
_, err := testImporter.Init(ctx, plugins.MakePluginConfig("netaddr: "+ts.URL), logger)
assert.NoError(t, err)
assert.NotEqual(t, testImporter, nil)

_, err = testImporter.GetBlock(uint64(10))
assert.Error(t, err)
cancel()
}
18 changes: 18 additions & 0 deletions conduit/plugins/importers/algodfollower/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package algodfollower

import "github.com/prometheus/client_golang/prometheus"

// Prometheus metric names
const (
GetAlgodRawBlockTimeName = "get_algod_raw_block_time_sec"
)

// Initialize the prometheus objects
var (
GetAlgodRawBlockTimeSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: "algod_follower",
Name: GetAlgodRawBlockTimeName,
Help: "Total response time from Algod's raw block endpoint in seconds.",
})
)
Loading