Skip to content

Commit

Permalink
Ingester main loop (#5)
Browse files Browse the repository at this point in the history
This is the main loop logic, it connects the:
 - rpc node client that is getting blocks
 - dune api client, that is sending blocks to dune.

it has a few responsabilities:
- track errors on either side (and later do something about it)
- track if it is behind or up to date
- try to go fast.
- produce good logs of what's going on
  • Loading branch information
msf committed Jun 6, 2024
1 parent b8897af commit aac75cb
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 79 deletions.
29 changes: 1 addition & 28 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package duneapi

import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
Expand All @@ -19,18 +18,9 @@ const (
)

type BlockchainIngester interface {
// Sync pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error

// SendBlock sends a block to DuneAPI
SendBlock(payload models.RPCBlock) error

// TODO:
// - Batching multiple blocks in a single request
// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -69,25 +59,8 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
}, nil
}

func (c *client) Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case payload, ok := <-blocksCh:
if !ok {
return nil // channel closed
}
if err := c.SendBlock(payload); err != nil {
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
c.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
}
}
}
}

// SendBlock sends a block to DuneAPI
// TODO: support batching multiple blocks in a single request
func (c *client) SendBlock(payload models.RPCBlock) error {
start := time.Now()
buffer := c.bufPool.Get().(*bytes.Buffer)
Expand Down
16 changes: 8 additions & 8 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ import (
type BlockchainClient interface {
LatestBlockNumber() (int64, error)
BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error)

// SendBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
SendBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error

Close() error
}

Expand All @@ -37,13 +31,13 @@ type rpcClient struct {
bufPool *sync.Pool
}

func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-line:unexported-return
func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:disable-line:unexported-return
client := retryablehttp.NewClient()
client.RetryMax = MaxRetries
client.Logger = log
client.CheckRetry = retryablehttp.DefaultRetryPolicy
client.Backoff = retryablehttp.LinearJitterBackoff
return &rpcClient{
rpc := &rpcClient{
client: client,
cfg: cfg,
log: log,
Expand All @@ -53,6 +47,12 @@ func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-l
},
},
}
// lets validate RPC node is up & reachable
_, err := rpc.LatestBlockNumber()
if err != nil {
return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err)
}
return rpc, nil
}

func (c *rpcClient) LatestBlockNumber() (int64, error) {
Expand Down
39 changes: 14 additions & 25 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
"golang.org/x/sync/errgroup"
Expand All @@ -16,9 +17,12 @@ type OpStackClient struct {

var _ BlockchainClient = &OpStackClient{}

func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient {
rpcClient := NewRPCClient(cfg, log)
return &OpStackClient{*rpcClient}
func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) {
rpcClient, err := NewClient(log, cfg)
if err != nil {
return nil, err
}
return &OpStackClient{*rpcClient}, nil
}

// BlockByNumber returns the block with the given blockNumber.
Expand All @@ -31,6 +35,13 @@ func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient {
//
// we should handle the case where it is not available
func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) {
tStart := time.Now()
defer func() {
c.log.Info("BlockByNumber",
"blockNumber", blockNumber,
"duration", time.Since(tStart),
)
}()
blockNumberHex := fmt.Sprintf("0x%x", blockNumber)

// TODO: split this into mandatory and optional methods
Expand Down Expand Up @@ -79,25 +90,3 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
Payload: buffer.Bytes(),
}, nil
}

func (c *OpStackClient) SendBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {
block, err := c.BlockByNumber(ctx, blockNumber)
if err != nil {
c.log.Error("Failed to get block by number",
"blockNumber", blockNumber,
"error", err,
)
return err
}
select {
case <-ctx.Done():
return nil
case outChan <- block:
}
}
return nil
}
40 changes: 28 additions & 12 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"time"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/config"
"github.com/duneanalytics/blockchain-ingester/ingester"
"github.com/duneanalytics/blockchain-ingester/models"
)

func init() {
Expand Down Expand Up @@ -42,21 +45,34 @@ func main() {
defer duneClient.Close()

var wg stdsync.WaitGroup
var rpcClient jsonrpc.BlockchainClient

// rpcClient, err := jsonrpc.NewClient(&rpc.Config{
// NodeURL: cfg.BlockchainNodeURL,
// PoolInterval: cfg.PoolInterval,
// Stack: cfg.BlockchainStack,
// })
switch cfg.RPCStack {
case models.OpStack:
rpcClient, err = jsonrpc.NewOpStackClient(logger, jsonrpc.Config{
URL: cfg.RPCNode.NodeURL,
})
default:
stdlog.Fatalf("unsupported RPC stack: %s", cfg.RPCStack)
}
if err != nil {
stdlog.Fatal(err)
}

ingester := ingester.New(
logger,
rpcClient,
duneClient,
ingester.Config{
PollInterval: cfg.PollInterval,
StartBlockHeight: cfg.BlockHeight,
},
)

// harvester, err := harvester.New(&harvester.Config{
// Logger: logger,
// DuneClient: duneClient,
// RPCClient: rpcClient,
// })
wg.Add(1)
ingester.Run(context.Background(), &wg)

// wg.Add(1)
// harvester.Run(ctx, &wg)
// TODO: add a metrics exporter or healthcheck http endpoint ?

_, cancelFn := context.WithCancel(context.Background())
quit := make(chan os.Signal, 1)
Expand Down
11 changes: 5 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ func (d DuneClient) HasError() error {
}

type RPCClient struct {
NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"`
PoolInterval time.Duration `long:"rpc-pool-interval" env:"RPC_POOL_INTERVAL" description:"Interval to pool the blockchain node" default:"500millis"` // nolint:lll
NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"`
}

func (r RPCClient) HasError() error {
Expand All @@ -33,12 +32,12 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500millis"` // nolint:lll
RPCNode RPCClient
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
BatchSize int `long:"batch-size" env:"BATCH_SIZE" description:"number of blocks to submit to Dune" default:"3"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
82 changes: 82 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package ingester

import (
"context"
"log/slog"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
"github.com/duneanalytics/blockchain-ingester/client/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
)

type Ingester interface {
Run(ctx context.Context, wg *sync.WaitGroup) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error

// This is just a placeholder for now
Info() Info
}

const defaultMaxBatchSize = 1

type Config struct {
MaxBatchSize int
StartBlockHeight int64
PollInterval time.Duration
}

type Info struct {
LatestBlockNumber int64
IngestedBlockNumber int64
ConsumedBlockNumber int64
RPCErrors []ErrorInfo
DuneErrors []ErrorInfo
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumber int64
Error error
}

type ingester struct {
log *slog.Logger
node jsonrpc.BlockchainClient
dune duneapi.BlockchainIngester
cfg Config
info Info
}

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
ing := &ingester{
log: log,
node: node,
dune: dune,
cfg: cfg,
info: Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
},
}
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = defaultMaxBatchSize
}
return ing
}

func (i *ingester) Info() Info {
return Info{}
}
Loading

0 comments on commit aac75cb

Please sign in to comment.