Skip to content

Commit

Permalink
Improve logging so it's much less verbose (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 27, 2024
1 parent a4ba018 commit dc918c7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 42 deletions.
7 changes: 6 additions & 1 deletion client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
"duration", time.Since(start),
)
} else {
c.log.Info("INGEST SUCCESS",
c.log.Debug("INGEST SUCCESS",
"blockNumbers", request.BlockNumbers,
"response", response.String(),
"payloadSize", len(request.Payload),
Expand Down Expand Up @@ -195,6 +195,11 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
return err
}

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) {
func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) {
tStart := time.Now()
defer func() {
c.log.Info("BlockByNumber",
c.log.Debug("BlockByNumber",
"blockNumber", blockNumber,
"duration", time.Since(tStart),
)
Expand Down
67 changes: 27 additions & 40 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,18 @@ func (i *ingester) ProduceBlockNumbers(

// Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0
dontStop := endBlockNumber < startBlockNumber
i.log.Info("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber)
i.log.Debug("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber)
for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)

select {
case <-ctx.Done():
i.log.Info("ProduceBlockNumbers: Context canceled, stopping")
i.log.Debug("ProduceBlockNumbers: Context canceled, stopping")
return ctx.Err()
case blockNumbers <- blockNumber:
}

distanceFromLatest := latestBlockNumber - blockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"distanceFromLatest", distanceFromLatest,
)
}
}
i.log.Info("Finished producing block numbers")
i.log.Debug("Finished producing block numbers")
return ErrFinishedFetchBlockLoop
}

Expand All @@ -135,7 +125,7 @@ func (i *ingester) FetchBlockLoop(
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("FetchBlockLoop: Context canceled, stopping")
i.log.Error("FetchBlockLoop: Context canceled, stopping")
return ctx.Err()
}

Expand All @@ -156,10 +146,10 @@ func (i *ingester) FetchBlockLoop(
getBlockElapsed := time.Since(startTime)
select {
case <-ctx.Done():
i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
i.log.Debug("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info(
i.log.Debug(
"FetchBlockLoop: Got and sent block",
"blockNumber", blockNumber,
"getBlockElapsed", getBlockElapsed,
Expand All @@ -178,16 +168,16 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()

i.log.Info("SendBlocks: Starting to receive blocks")
i.log.Debug("SendBlocks: Starting to receive blocks")
for {
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed).
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
i.log.Debug("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocks:
if !ok {
i.log.Info("SendBlocks: Channel is closed, returning")
i.log.Debug("SendBlocks: Channel is closed, returning")
return nil
}

Expand All @@ -198,11 +188,11 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
Error: block.Error,
})

i.log.Info("Received FAILED block", "number", block.BlockNumber)
i.log.Error("Received FAILED block", "number", block.BlockNumber)
}

collectedBlocks[block.BlockNumber] = block
i.log.Info(
i.log.Debug(
"SendBlocks: Received block",
"blockNumber", block.BlockNumber,
"bufferSize", len(collectedBlocks),
Expand All @@ -213,7 +203,6 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
if err != nil {
return errors.Errorf("send blocks: %w", err)
}
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}
Expand All @@ -224,58 +213,56 @@ func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
collectedBlocks map[int64]models.RPCBlock,
nextNumberToSend int64,
nextBlockToSend int64,
) (int64, error) {
// Outer loop: We might need to send multiple batch requests if our buffer is too big
for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] {
for _, ok := collectedBlocks[nextBlockToSend]; ok; _, ok = collectedBlocks[nextBlockToSend] {
// Collect a blocks of blocks to send, only send those which are in order
// Collect a batch to send, only send those which are in order
blockBatch := make([]models.RPCBlock, 0, maxBatchSize)
for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] {
for block, ok := collectedBlocks[nextBlockToSend]; ok; block, ok = collectedBlocks[nextBlockToSend] {
// Skip Failed block if we're configured to skip Failed blocks
if i.cfg.SkipFailedBlocks && block.Errored() {
nextNumberToSend++
nextBlockToSend++
continue
}

blockBatch = append(blockBatch, block)
delete(collectedBlocks, nextNumberToSend)
nextNumberToSend++
delete(collectedBlocks, nextBlockToSend)
nextBlockToSend++

if len(blockBatch) == maxBatchSize {
break
}
}

if len(blockBatch) == 0 {
return nextNumberToSend, nil
return nextBlockToSend, nil
}

// Send the batch
lastBlockNumber := blockBatch[len(blockBatch)-1].BlockNumber
if lastBlockNumber != nextNumberToSend-1 {
if lastBlockNumber != nextBlockToSend-1 {
panic("unexpected last block number")
}
if err := i.dune.SendBlocks(ctx, blockBatch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend, nil
return nextBlockToSend, nil
}
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextNumberToSend, err
return nextBlockToSend, err
}
i.log.Info(
"SendBlocks: Sent batch, updating latest ingested block number",
"blockNumberFirst", blockBatch[0].BlockNumber,
"blockNumberLast", lastBlockNumber,
"batchSize", len(blockBatch),
)
atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
}

return nextNumberToSend, nil
i.log.Info(
"Sent completed blocks to DuneAPI",
"bufferSize", len(collectedBlocks),
"nextBlockToSend", nextBlockToSend,
)
return nextBlockToSend, nil
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down

0 comments on commit dc918c7

Please sign in to comment.