Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Feature: Adds the sync mode importer #1415

Merged
6 changes: 3 additions & 3 deletions api/handlers_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ import (
"testing"
"time"

"github.com/algorand/avm-abi/apps"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/avm-abi/apps"
"github.com/algorand/go-algorand-sdk/v2/encoding/json"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/go-algorand/crypto"
"github.com/algorand/go-algorand/crypto/merklesignature"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/indexer/api/generated/v2"
"github.com/algorand/indexer/idb"
"github.com/algorand/indexer/idb/postgres"
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/importers/algod/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
var (
GetAlgodRawBlockTimeSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: "indexer_daemon",
Subsystem: "algod_importer",
Name: GetAlgodRawBlockTimeName,
Help: "Total response time from Algod's raw block endpoint in seconds.",
})
Expand Down
18 changes: 18 additions & 0 deletions conduit/plugins/importers/syncalgod/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package syncalgod

import "github.com/prometheus/client_golang/prometheus"

// Prometheus metric names
const (
GetAlgodRawBlockTimeName = "get_algod_raw_block_time_sec"
)

// Initialize the prometheus objects
var (
GetAlgodRawBlockTimeSeconds = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: "sync_algod",
Name: GetAlgodRawBlockTimeName,
Help: "Total response time from Algod's raw block endpoint in seconds.",
})
)
6 changes: 6 additions & 0 deletions conduit/plugins/importers/syncalgod/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name: sync_algod
config:
# Algod netaddr string
netaddr: "http://url"
# Algod rest endpoint token
token: ""
169 changes: 169 additions & 0 deletions conduit/plugins/importers/syncalgod/syncalgod_importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package syncalgod

import (
"context"
_ "embed"
"fmt"
"net/url"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"

"github.com/algorand/indexer/conduit"
"github.com/algorand/indexer/conduit/plugins"
"github.com/algorand/indexer/conduit/plugins/importers"
"github.com/algorand/indexer/data"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/encoding/json"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/rpcs"
)

const importerName = "sync_algod"
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved

type syncModeImporter struct {
aclient *algod.Client
logger *logrus.Logger
cfg Config
ctx context.Context
cancel context.CancelFunc
}

func (sm *syncModeImporter) OnComplete(input data.BlockData) error {
_, err := sm.aclient.SetSyncRound(input.Round() + 1).Do(sm.ctx)
return err
}

//go:embed sample.yaml
var sampleConfig string

var syncAlgodModeImporterMetadata = conduit.Metadata{
Name: importerName,
Description: "Importer for fetching blocks from an algod REST API using sync round and ledger delta algod calls.",
Deprecated: false,
SampleConfig: sampleConfig,
}

// New initializes an algod importer
func New() importers.Importer {
return &syncModeImporter{}
}

func (sm *syncModeImporter) Metadata() conduit.Metadata {
return syncAlgodModeImporterMetadata
}

// package-wide init function
func init() {
importers.Register(importerName, importers.ImporterConstructorFunc(func() importers.Importer {
return &syncModeImporter{}
}))
}

func (sm *syncModeImporter) Init(ctx context.Context, cfg plugins.PluginConfig, logger *logrus.Logger) (*sdk.Genesis, error) {
sm.ctx, sm.cancel = context.WithCancel(ctx)
sm.logger = logger
err := cfg.UnmarshalConfig(&sm.cfg)
if err != nil {
return nil, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}
var client *algod.Client
u, err := url.Parse(sm.cfg.NetAddr)
if err != nil {
return nil, err
}

if u.Scheme != "http" && u.Scheme != "https" {
sm.cfg.NetAddr = "http://" + sm.cfg.NetAddr
sm.logger.Infof("Algod Importer added http prefix to NetAddr: %s", sm.cfg.NetAddr)
}
client, err = algod.MakeClient(sm.cfg.NetAddr, sm.cfg.Token)
if err != nil {
return nil, err
}
sm.aclient = client

genesisResponse, err := client.GetGenesis().Do(ctx)
if err != nil {
return nil, err
}

genesis := sdk.Genesis{}

err = json.Decode([]byte(genesisResponse), &genesis)
if err != nil {
return nil, err
}

return &genesis, err
}

func (sm *syncModeImporter) Config() string {
s, _ := yaml.Marshal(sm.cfg)
return string(s)
}

func (sm *syncModeImporter) Close() error {
if sm.cancel != nil {
sm.cancel()
}
return nil
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved
}

func (sm *syncModeImporter) GetBlock(rnd uint64) (data.BlockData, error) {
var blockbytes []byte
var err error
var blk data.BlockData

for retries := 0; retries < 3; retries++ {
// nextRound - 1 because the endpoint waits until the subsequent block is committed to return
_, err = sm.aclient.StatusAfterBlock(rnd - 1).Do(sm.ctx)
if err != nil {
// If context has expired.
if sm.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
sm.logger.Errorf(
"r=%d error getting status %d", retries, rnd)
continue
}
start := time.Now()
blockbytes, err = sm.aclient.BlockRaw(rnd).Do(sm.ctx)
dt := time.Since(start)
GetAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
return blk, err
}
tmpBlk := new(rpcs.EncodedBlockCert)
err = protocol.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
}

// We aren't going to do anything with the new delta until we get everything
// else converted over
_, err = sm.aclient.GetLedgerStateDelta(rnd).Do(sm.ctx)
if err != nil {
return blk, err
}
AlgoStephenAkiki marked this conversation as resolved.
Show resolved Hide resolved

blk = data.BlockData{
BlockHeader: tmpBlk.Block.BlockHeader,
Payset: tmpBlk.Block.Payset,
Certificate: &tmpBlk.Certificate,
}
return blk, err
}
sm.logger.Error("GetBlock finished retries without fetching a block. Check that the indexer is set to start at a round that the current algod node can handle")
return blk, fmt.Errorf("finished retries without fetching a block. Check that the indexer is set to start at a round that the current algod node can handle")
}

func (sm *syncModeImporter) ProvideMetrics() []prometheus.Collector {
return []prometheus.Collector{
GetAlgodRawBlockTimeSeconds,
}
}
13 changes: 13 additions & 0 deletions conduit/plugins/importers/syncalgod/syncalgod_importer_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package syncalgod

//go:generate conduit-docs ../../../../conduit-docs/

//Name: conduit_importers_syncmode

// Config specific to the sync mode importer
type Config struct {
// <code>netaddr</code> is the Algod network address. It must be either an <code>http</code> or <code>https</code> URL.
NetAddr string `yaml:"netaddr"`
// <code>token</code> is the Algod API endpoint token.
Token string `yaml:"token"`
}
Loading