diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 18ca280..492db55 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -153,7 +153,7 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques "duration", time.Since(start), ) } else { - c.log.Info("INGEST SUCCESS", + c.log.Debug("INGEST SUCCESS", "blockNumbers", request.BlockNumbers, "response", response.String(), "payloadSize", len(request.Payload), @@ -195,6 +195,11 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques return err } + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + return err + } + return nil } diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 3ce32a0..0029735 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -37,7 +37,7 @@ func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) { func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) { tStart := time.Now() defer func() { - c.log.Info("BlockByNumber", + c.log.Debug("BlockByNumber", "blockNumber", blockNumber, "duration", time.Since(tStart), ) diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 0de32cf..a47d393 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -95,28 +95,18 @@ func (i *ingester) ProduceBlockNumbers( // Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0 dontStop := endBlockNumber < startBlockNumber - i.log.Info("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber) + i.log.Debug("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber) for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber) select { case <-ctx.Done(): - i.log.Info("ProduceBlockNumbers: Context canceled, stopping") + i.log.Debug("ProduceBlockNumbers: Context canceled, stopping") return ctx.Err() case blockNumbers <- blockNumber: } - - distanceFromLatest := latestBlockNumber - blockNumber - if distanceFromLatest > 0 { - // TODO: improve logs of processing speed and catchup estimated ETA - i.log.Info("We're behind, trying to catch up..", - "blockNumber", blockNumber, - "latestBlockNumber", latestBlockNumber, - "distanceFromLatest", distanceFromLatest, - ) - } } - i.log.Info("Finished producing block numbers") + i.log.Debug("Finished producing block numbers") return ErrFinishedFetchBlockLoop } @@ -135,7 +125,7 @@ func (i *ingester) FetchBlockLoop( block, err := i.node.BlockByNumber(ctx, blockNumber) if err != nil { if errors.Is(err, context.Canceled) { - i.log.Info("FetchBlockLoop: Context canceled, stopping") + i.log.Error("FetchBlockLoop: Context canceled, stopping") return ctx.Err() } @@ -156,10 +146,10 @@ func (i *ingester) FetchBlockLoop( getBlockElapsed := time.Since(startTime) select { case <-ctx.Done(): - i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + i.log.Debug("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) return ctx.Err() case blocks <- block: - i.log.Info( + i.log.Debug( "FetchBlockLoop: Got and sent block", "blockNumber", blockNumber, "getBlockElapsed", getBlockElapsed, @@ -178,16 +168,16 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval) defer batchTimer.Stop() - i.log.Info("SendBlocks: Starting to receive blocks") + i.log.Debug("SendBlocks: Starting to receive blocks") for { // Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed). select { case <-ctx.Done(): - i.log.Info("SendBlocks: Context canceled, stopping") + i.log.Debug("SendBlocks: Context canceled, stopping") return ctx.Err() case block, ok := <-blocks: if !ok { - i.log.Info("SendBlocks: Channel is closed, returning") + i.log.Debug("SendBlocks: Channel is closed, returning") return nil } @@ -198,11 +188,11 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock Error: block.Error, }) - i.log.Info("Received FAILED block", "number", block.BlockNumber) + i.log.Error("Received FAILED block", "number", block.BlockNumber) } collectedBlocks[block.BlockNumber] = block - i.log.Info( + i.log.Debug( "SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(collectedBlocks), @@ -213,7 +203,6 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock if err != nil { return errors.Errorf("send blocks: %w", err) } - i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) } } } @@ -224,23 +213,23 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock func (i *ingester) trySendCompletedBlocks( ctx context.Context, collectedBlocks map[int64]models.RPCBlock, - nextNumberToSend int64, + nextBlockToSend int64, ) (int64, error) { // Outer loop: We might need to send multiple batch requests if our buffer is too big - for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] { + for _, ok := collectedBlocks[nextBlockToSend]; ok; _, ok = collectedBlocks[nextBlockToSend] { // Collect a blocks of blocks to send, only send those which are in order // Collect a batch to send, only send those which are in order blockBatch := make([]models.RPCBlock, 0, maxBatchSize) - for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] { + for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] { // Skip Failed block if we're configured to skip Failed blocks if i.cfg.SkipFailedBlocks && block.Errored() { - nextNumberToSend++ + nextBlockToSend++ continue } blockBatch = append(blockBatch, block) - delete(collectedBlocks, nextNumberToSend) - nextNumberToSend++ + delete(collectedBlocks, nextBlockToSend) + nextBlockToSend++ if len(blockBatch) == maxBatchSize { break @@ -248,34 +237,32 @@ func (i *ingester) trySendCompletedBlocks( } if len(blockBatch) == 0 { - return nextNumberToSend, nil + return nextBlockToSend, nil } // Send the batch lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber - if lastBlockNumber != nextNumberToSend-1 { + if lastBlockNumber != nextBlockToSend-1 { panic("unexpected last block number") } if err := i.dune.SendBlocks(ctx, blockBatch); err != nil { if errors.Is(err, context.Canceled) { i.log.Info("SendBlocks: Context canceled, stopping") - return nextNumberToSend, nil + return nextBlockToSend, nil } // TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries err := errors.Errorf("failed to send batch: %w", err) i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) - return nextNumberToSend, err + return nextBlockToSend, err } - i.log.Info( - "SendBlocks: Sent batch, updating latest ingested block number", - "blockNumberFirst", blockBatch[0].BlockNumber, - "blockNumberLast", lastBlockNumber, - "batchSize", len(blockBatch), - ) atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) } - - return nextNumberToSend, nil + i.log.Info( + "Sent completed blocks to DuneAPI", + "bufferSize", len(collectedBlocks), + "nextBlockToSend", nextBlockToSend, + ) + return nextBlockToSend, nil } func (i *ingester) tryUpdateLatestBlockNumber() int64 {