diff --git a/client/duneapi/client.go b/client/duneapi/client.go index c4fbd17..a7dc13f 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -48,7 +48,7 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy httpClient.Backoff = retryablehttp.LinearJitterBackoff return &client{ - log: log, + log: log.With("module", "duneapi"), httpClient: httpClient, cfg: cfg, compressor: comp, @@ -63,12 +63,8 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line // SendBlock sends a block to DuneAPI // TODO: support batching multiple blocks in a single request func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error { - start := time.Now() buffer := c.bufPool.Get().(*bytes.Buffer) - defer func() { - c.bufPool.Put(buffer) - c.log.Info("SendBlock", "payloadLength", len(payload.Payload), "duration", time.Since(start)) - }() + defer c.bufPool.Put(buffer) request, err := c.buildRequest(payload, buffer) if err != nil { @@ -82,7 +78,6 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl if c.cfg.DisableCompression { request.Payload = payload.Payload - request.ContentType = "application/x-ndjson" } else { buffer.Reset() c.compressor.Reset(buffer) @@ -90,7 +85,7 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl if err != nil { return request, err } - request.ContentType = "application/zstd" + request.ContentEncoding = "application/zstd" request.Payload = buffer.Bytes() } request.BlockNumber = payload.BlockNumber @@ -111,24 +106,29 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques "blockNumber", request.BlockNumber, "error", err, "statusCode", responseStatus, + "payloadSize", len(request.Payload), "duration", time.Since(start), ) } else { - c.log.Info("BLOCK INGESTED", + c.log.Info("BLOCK SENT", "blockNumber", request.BlockNumber, "response", response.String(), + "payloadSize", len(request.Payload), "duration", time.Since(start), ) } }() - url := fmt.Sprintf("%s/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName) + url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest", c.cfg.URL, c.cfg.BlockchainName) c.log.Debug("Sending request", "url", url) req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(request.Payload)) if err != nil { return err } - req.Header.Set("Content-Type", request.ContentType) + if request.ContentEncoding != "" { + req.Header.Set("Content-Encoding", request.ContentEncoding) + } + req.Header.Set("Content-Type", "application/x-ndjson") req.Header.Set("x-idempotency-key", request.IdempotencyKey) req.Header.Set("x-dune-evm-stack", request.EVMStack) req.Header.Set("x-dune-api-key", c.cfg.APIKey) diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 774d1ad..31cc90a 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -2,6 +2,7 @@ package duneapi import ( "fmt" + "sort" "github.com/duneanalytics/blockchain-ingester/models" ) @@ -27,18 +28,21 @@ type BlockchainIngestResponse struct { type IngestedTableInfo struct { Name string `json:"name"` - Rows int `json:"rows"` - Bytes int `json:"bytes"` + Rows int `json:"rows_written"` + Bytes int `json:"bytes_written"` } func (b *BlockchainIngestResponse) String() string { - return fmt.Sprintf("Ingested: %+v", b.Tables) + sort.Slice(b.Tables, func(i, j int) bool { + return b.Tables[i].Name < b.Tables[j].Name + }) + return fmt.Sprintf("%+v", b.Tables) } type BlockchainIngestRequest struct { - BlockNumber int64 - ContentType string - EVMStack string - IdempotencyKey string - Payload []byte + BlockNumber int64 + ContentEncoding string + EVMStack string + IdempotencyKey string + Payload []byte } diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 1ae762b..cb117a6 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -8,6 +8,7 @@ import ( "log/slog" "net/http" "sync" + "time" "github.com/duneanalytics/blockchain-ingester/lib/hexutils" "github.com/duneanalytics/blockchain-ingester/models" @@ -21,7 +22,8 @@ type BlockchainClient interface { } const ( - MaxRetries = 10 + MaxRetries = 10 + DefaultRequestTimeout = 30 * time.Second ) type rpcClient struct { @@ -37,6 +39,7 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis client.Logger = log client.CheckRetry = retryablehttp.DefaultRetryPolicy client.Backoff = retryablehttp.LinearJitterBackoff + client.HTTPClient.Timeout = DefaultRequestTimeout rpc := &rpcClient{ client: client, cfg: cfg, @@ -81,7 +84,7 @@ func (c *rpcClient) LatestBlockNumber() (int64, error) { // getResponseBody sends a request to the server and returns the response body func (c *rpcClient) getResponseBody( - ctx context.Context, method string, params interface{}, output *bytes.Buffer, + ctx context.Context, method string, params []interface{}, output *bytes.Buffer, ) error { reqData := map[string]interface{}{ "jsonrpc": "2.0", diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index e10149a..3ce32a0 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -18,7 +18,7 @@ type OpStackClient struct { var _ BlockchainClient = &OpStackClient{} func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) { - rpcClient, err := NewClient(log, cfg) + rpcClient, err := NewClient(log.With("module", "jsonrpc"), cfg) if err != nil { return nil, err } @@ -81,7 +81,7 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m // copy the responses in order var buffer bytes.Buffer for _, res := range results { - buffer.Grow(res.Len() + 1) + buffer.Grow(res.Len()) buffer.ReadFrom(res) } return models.RPCBlock{ diff --git a/cmd/main.go b/cmd/main.go index 1a10c98..d5c2e33 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -65,8 +65,9 @@ func main() { rpcClient, duneClient, ingester.Config{ - PollInterval: cfg.PollInterval, - MaxBatchSize: 1, + MaxBatchSize: 1, + ReportProgressInterval: cfg.ReportProgressInterval, + PollInterval: cfg.PollInterval, }, ) diff --git a/config/config.go b/config/config.go index 793b4a6..a0f85f4 100644 --- a/config/config.go +++ b/config/config.go @@ -10,7 +10,7 @@ import ( type DuneClient struct { APIKey string `long:"dune-api-key" env:"DUNE_API_KEY" description:"API key for DuneAPI"` - URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com/api"` + URL string `long:"dune-api-url" env:"DUNE_API_URL" description:"URL for DuneAPI" default:"https://api.dune.com"` } func (d DuneClient) HasError() error { @@ -32,13 +32,14 @@ func (r RPCClient) HasError() error { } type Config struct { - BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll - BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll - EnableCompression bool `long:"enable-compression" env:"ENABLE_COMPRESSION" description:"enable compression when pushing payload to Dune"` // nolint:lll - Dune DuneClient - PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500ms"` // nolint:lll - RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll + BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll + EnableCompression bool `long:"enable-compression" env:"ENABLE_COMPRESSION" description:"enable compression when pushing payload to Dune"` // nolint:lll + Dune DuneClient + PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll + ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll + RPCNode RPCClient + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index 462ad0d..32de024 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -30,11 +30,16 @@ type Ingester interface { Info() Info } -const defaultMaxBatchSize = 1 +const ( + defaultMaxBatchSize = 1 + defaultPollInterval = 1 * time.Second + defaultReportProgressInterval = 30 * time.Second +) type Config struct { - MaxBatchSize int - PollInterval time.Duration + MaxBatchSize int + PollInterval time.Duration + ReportProgressInterval time.Duration } type Info struct { @@ -61,7 +66,7 @@ type ingester struct { func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester { ing := &ingester{ - log: log, + log: log.With("module", "ingester"), node: node, dune: dune, cfg: cfg, @@ -73,9 +78,15 @@ func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.Blockchai if ing.cfg.MaxBatchSize == 0 { ing.cfg.MaxBatchSize = defaultMaxBatchSize } + if ing.cfg.PollInterval == 0 { + ing.cfg.PollInterval = defaultPollInterval + } + if ing.cfg.ReportProgressInterval == 0 { + ing.cfg.ReportProgressInterval = defaultReportProgressInterval + } return ing } func (i *ingester) Info() Info { - return Info{} + return i.info } diff --git a/ingester/main b/ingester/main new file mode 100755 index 0000000..9d7fc5b Binary files /dev/null and b/ingester/main differ diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 0be6dcf..79d0b2d 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -56,29 +56,33 @@ func (i *ingester) ConsumeBlocks( dontStop := endBlockNumber <= startBlockNumber latestBlockNumber := i.tryUpdateLatestBlockNumber() - waitForBlock := func(blockNumber, latestBlockNumber int64) int64 { + waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 { for blockNumber > latestBlockNumber { select { case <-ctx.Done(): return latestBlockNumber - default: + case <-time.After(i.cfg.PollInterval): } - i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), + i.log.Debug(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber, ) - time.Sleep(i.cfg.PollInterval) latestBlockNumber = i.tryUpdateLatestBlockNumber() } return latestBlockNumber } for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { - latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber) + latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber) startTime := time.Now() block, err := i.node.BlockByNumber(ctx, blockNumber) if err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("Context canceled, stopping..") + return err + } + i.log.Error("Failed to get block by number, continuing..", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber, @@ -90,10 +94,6 @@ func (i *ingester) ConsumeBlocks( Error: err, }) - if errors.Is(err, context.Canceled) { - return err - } - // TODO: should I sleep (backoff) here? continue } @@ -159,7 +159,7 @@ func (i *ingester) tryUpdateLatestBlockNumber() int64 { } func (i *ingester) ReportProgress(ctx context.Context) error { - timer := time.NewTicker(20 * time.Second) + timer := time.NewTicker(i.cfg.ReportProgressInterval) defer timer.Stop() previousTime := time.Now() @@ -173,20 +173,32 @@ func (i *ingester) ReportProgress(ctx context.Context) error { case tNow := <-timer.C: latest := atomic.LoadInt64(&i.info.LatestBlockNumber) lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber) - lastConsumed := atomic.LoadInt64(&i.info.ConsumedBlockNumber) blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds() newDistance := latest - lastIngested fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable - i.log.Info("Info", + rpcErrors := len(i.info.RPCErrors) + duneErrors := len(i.info.DuneErrors) + fields := []interface{}{ + "blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec), "latestBlockNumber", latest, "ingestedBlockNumber", lastIngested, - "consumedBlockNumber", lastConsumed, - "distanceFromLatest", latest-lastIngested, - "FallingBehind", fallingBehind, - "blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec), - ) + } + if fallingBehind { + fields = append(fields, "fallingBehind", fallingBehind) + } + if newDistance > 1 { + fields = append(fields, "distanceFromLatest", newDistance) + } + if rpcErrors > 0 { + fields = append(fields, "rpcErrors", rpcErrors) + } + if duneErrors > 0 { + fields = append(fields, "duneErrors", duneErrors) + } + + i.log.Info("ProgressReport", fields...) previousIngested = lastIngested previousDistance = newDistance previousTime = tNow