Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfixes: small tidy ups and bug fixes #13

Merged
merged 3 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
httpClient.Backoff = retryablehttp.LinearJitterBackoff
return &client{
log: log,
log: log.With("module", "duneapi"),
httpClient: httpClient,
cfg: cfg,
compressor: comp,
Expand All @@ -63,12 +63,8 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
// SendBlock sends a block to DuneAPI
// TODO: support batching multiple blocks in a single request
func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
start := time.Now()
buffer := c.bufPool.Get().(*bytes.Buffer)
defer func() {
c.bufPool.Put(buffer)
c.log.Info("SendBlock", "payloadLength", len(payload.Payload), "duration", time.Since(start))
}()
defer c.bufPool.Put(buffer)

request, err := c.buildRequest(payload, buffer)
if err != nil {
Expand All @@ -82,15 +78,14 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl

if c.cfg.DisableCompression {
request.Payload = payload.Payload
request.ContentType = "application/x-ndjson"
} else {
buffer.Reset()
c.compressor.Reset(buffer)
_, err := c.compressor.Write(payload.Payload)
if err != nil {
return request, err
}
request.ContentType = "application/zstd"
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.BlockNumber = payload.BlockNumber
Expand All @@ -111,24 +106,29 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
"blockNumber", request.BlockNumber,
"error", err,
"statusCode", responseStatus,
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
} else {
c.log.Info("BLOCK INGESTED",
c.log.Info("BLOCK SENT",
"blockNumber", request.BlockNumber,
"response", response.String(),
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName)
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(request.Payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", request.ContentType)
if request.ContentEncoding != "" {
req.Header.Set("Content-Encoding", request.ContentEncoding)
}
req.Header.Set("Content-Type", "application/x-ndjson")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it going to be always JSON ? Or JSON just for now ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, but great question

req.Header.Set("x-idempotency-key", request.IdempotencyKey)
req.Header.Set("x-dune-evm-stack", request.EVMStack)
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
Expand Down
20 changes: 12 additions & 8 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package duneapi

import (
"fmt"
"sort"

"github.com/duneanalytics/blockchain-ingester/models"
)
Expand All @@ -27,18 +28,21 @@ type BlockchainIngestResponse struct {

type IngestedTableInfo struct {
Name string `json:"name"`
Rows int `json:"rows"`
Bytes int `json:"bytes"`
Rows int `json:"rows_written"`
Bytes int `json:"bytes_written"`
}

func (b *BlockchainIngestResponse) String() string {
return fmt.Sprintf("Ingested: %+v", b.Tables)
sort.Slice(b.Tables, func(i, j int) bool {
return b.Tables[i].Name < b.Tables[j].Name
})
return fmt.Sprintf("%+v", b.Tables)
}

type BlockchainIngestRequest struct {
BlockNumber int64
ContentType string
EVMStack string
IdempotencyKey string
Payload []byte
BlockNumber int64
ContentEncoding string
EVMStack string
IdempotencyKey string
Payload []byte
}
7 changes: 5 additions & 2 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"net/http"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/lib/hexutils"
"github.com/duneanalytics/blockchain-ingester/models"
Expand All @@ -21,7 +22,8 @@ type BlockchainClient interface {
}

const (
MaxRetries = 10
MaxRetries = 10
DefaultRequestTimeout = 30 * time.Second
)

type rpcClient struct {
Expand All @@ -37,6 +39,7 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
client.Logger = log
client.CheckRetry = retryablehttp.DefaultRetryPolicy
client.Backoff = retryablehttp.LinearJitterBackoff
client.HTTPClient.Timeout = DefaultRequestTimeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

rpc := &rpcClient{
client: client,
cfg: cfg,
Expand Down Expand Up @@ -81,7 +84,7 @@ func (c *rpcClient) LatestBlockNumber() (int64, error) {

// getResponseBody sends a request to the server and returns the response body
func (c *rpcClient) getResponseBody(
ctx context.Context, method string, params interface{}, output *bytes.Buffer,
ctx context.Context, method string, params []interface{}, output *bytes.Buffer,
) error {
reqData := map[string]interface{}{
"jsonrpc": "2.0",
Expand Down
4 changes: 2 additions & 2 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type OpStackClient struct {
var _ BlockchainClient = &OpStackClient{}

func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) {
rpcClient, err := NewClient(log, cfg)
rpcClient, err := NewClient(log.With("module", "jsonrpc"), cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m
// copy the responses in order
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len() + 1)
buffer.Grow(res.Len())
buffer.ReadFrom(res)
}
return models.RPCBlock{
Expand Down
5 changes: 3 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
PollInterval: cfg.PollInterval,
MaxBatchSize: 1,
MaxBatchSize: 1,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
},
)

Expand Down
17 changes: 9 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type DuneClient struct {
APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com/api"`
URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"`
}

func (d DuneClient) HasError() error {
Expand All @@ -32,13 +32,14 @@ func (r RPCClient) HasError() error {
}

type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
EnableCompression bool `long:"enable-compression" env:"ENABLE_COMPRESSION" description:"enable compression when pushing payload to Dune"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500ms"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
EnableCompression bool `long:"enable-compression" env:"ENABLE_COMPRESSION" description:"enable compression when pushing payload to Dune"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
21 changes: 16 additions & 5 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,16 @@ type Ingester interface {
Info() Info
}

const defaultMaxBatchSize = 1
const (
defaultMaxBatchSize = 1
defaultPollInterval = 1 * time.Second
defaultReportProgressInterval = 30 * time.Second
)

type Config struct {
MaxBatchSize int
PollInterval time.Duration
MaxBatchSize int
PollInterval time.Duration
ReportProgressInterval time.Duration
}

type Info struct {
Expand All @@ -61,7 +66,7 @@ type ingester struct {

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
ing := &ingester{
log: log,
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
Expand All @@ -73,9 +78,15 @@ func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.Blockchai
if ing.cfg.MaxBatchSize == 0 {
ing.cfg.MaxBatchSize = defaultMaxBatchSize
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
}
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
return ing
}

func (i *ingester) Info() Info {
return Info{}
return i.info
}
Binary file added ingester/main
Binary file not shown.
46 changes: 29 additions & 17 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,33 @@ func (i *ingester) ConsumeBlocks(
dontStop := endBlockNumber <= startBlockNumber
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(blockNumber, latestBlockNumber int64) int64 {
waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
case <-ctx.Done():
return latestBlockNumber
default:
case <-time.After(i.cfg.PollInterval):
}
i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
i.log.Debug(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> waited? 😄

"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
)
time.Sleep(i.cfg.PollInterval)
latestBlockNumber = i.tryUpdateLatestBlockNumber()
}
return latestBlockNumber
}

for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber)
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()

block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return err
}
Comment on lines +81 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nice, this fixed the racy test perhaps?


i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
Expand All @@ -90,10 +94,6 @@ func (i *ingester) ConsumeBlocks(
Error: err,
})

if errors.Is(err, context.Canceled) {
return err
}

// TODO: should I sleep (backoff) here?
continue
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (i *ingester) tryUpdateLatestBlockNumber() int64 {
}

func (i *ingester) ReportProgress(ctx context.Context) error {
timer := time.NewTicker(20 * time.Second)
timer := time.NewTicker(i.cfg.ReportProgressInterval)
defer timer.Stop()

previousTime := time.Now()
Expand All @@ -173,20 +173,32 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
case tNow := <-timer.C:
latest := atomic.LoadInt64(&i.info.LatestBlockNumber)
lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)
lastConsumed := atomic.LoadInt64(&i.info.ConsumedBlockNumber)

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable

i.log.Info("Info",
rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
fields := []interface{}{
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
"latestBlockNumber", latest,
"ingestedBlockNumber", lastIngested,
"consumedBlockNumber", lastConsumed,
"distanceFromLatest", latest-lastIngested,
"FallingBehind", fallingBehind,
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
)
}
if fallingBehind {
fields = append(fields, "fallingBehind", fallingBehind)
}
if newDistance > 1 {
fields = append(fields, "distanceFromLatest", newDistance)
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
}
if duneErrors > 0 {
fields = append(fields, "duneErrors", duneErrors)
}

i.log.Info("ProgressReport", fields...)
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow
Expand Down
Loading