Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract ProduceBlocks from ConsumeBlocks #14

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
req.Header.Set("Content-Encoding", request.ContentEncoding)
}
req.Header.Set("Content-Type", "application/x-ndjson")
req.Header.Set("x-idempotency-key", request.IdempotencyKey)
req.Header.Set("idempotency-key", request.IdempotencyKey)
req.Header.Set("x-dune-evm-stack", request.EVMStack)
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
Expand Down
5 changes: 4 additions & 1 deletion ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber, maxCount int64) error

// TODO
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber, endBlockNumber int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error
ConsumeBlocks(ctx context.Context, inChan chan int64, outChan chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
Expand Down
189 changes: 115 additions & 74 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@
"golang.org/x/sync/errgroup"
)

func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error {
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize)
defer close(inFlightChan)

var err error
blockChan := make(chan int64, i.cfg.MaxBatchSize)

if startBlockNumber < 0 {
startBlockNumber, err = i.node.LatestBlockNumber()
if err != nil {
return errors.Errorf("failed to get latest block number: %w", err)
}
}
// var err error

// TODO: Use progress endpoint
// TODO: Fetch blocks in parallel

i.tryUpdateLatestBlockNumber()
i.log.Info("Got latest block number", "latestBlockNumber", i.info.LatestBlockNumber)

// endBlockNumber := startBlockNumber+maxCount

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
Expand All @@ -32,7 +34,12 @@

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount)
defer close(blockChan)
return i.ProduceBlockNumbers(ctx, blockChan, startBlockNumber, startBlockNumber+maxCount)
})
errGroup.Go(func() error {
defer close(inFlightChan)
return i.ConsumeBlocks(ctx, blockChan, inFlightChan)
})
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
Expand All @@ -41,98 +48,126 @@
return i.ReportProgress(ctx)
})

fmt.Println("waiting")

Check failure on line 51 in ingester/mainloop.go

View workflow job for this annotation

GitHub Actions / Lint and test

use of `fmt.Println` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
fmt.Println("errro in wait")

Check failure on line 53 in ingester/mainloop.go

View workflow job for this annotation

GitHub Actions / Lint and test

use of `fmt.Println` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
return err
}
fmt.Println("exiting run")

Check failure on line 56 in ingester/mainloop.go

View workflow job for this annotation

GitHub Actions / Lint and test

use of `fmt.Println` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
return nil
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
// ProduceBlockNumbers on a channel
func (i *ingester) ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber, endBlockNumber int64) error {
blockNumber := startBlockNumber

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
latestBlockNumber := i.tryUpdateLatestBlockNumber()
fmt.Println("starting produce", startBlockNumber, endBlockNumber)

waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
case <-ctx.Done():
return latestBlockNumber
case <-time.After(i.cfg.PollInterval):
}
i.log.Debug(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
)
latestBlockNumber = i.tryUpdateLatestBlockNumber()
// TODO: Update latest block number if we're caught up

for blockNumber < i.info.LatestBlockNumber && blockNumber <= endBlockNumber {
// i.log.Info("Attempting to produce block number", "blockNumber", blockNumber)
fmt.Println("lol")
select {
case <-ctx.Done():
return nil
case outChan <- blockNumber:
i.log.Info("Produced block number", "blockNumber", blockNumber)
blockNumber++
}
return latestBlockNumber
}
fmt.Println("exiting produce")
return nil
}

for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()
var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return err
// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock,
) error {
for {
select {
case <-ctx.Done():
return nil // context canceled
case blockNumber, ok := <-blockNumbers:
fmt.Println("got block number", blockNumber)
// TODO: we should batch RPC blocks here before sending to Dune.
if !ok {
fmt.Println("channel was closed")
fmt.Println("exiting consume")
return nil // channel closed
}
i.log.Info("Got block number", "blockNumber", blockNumber)
startTime := time.Now()
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
fmt.Println("exiting consume")
return err
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", i.info.LatestBlockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should I sleep (backoff) here?
continue
}
i.log.Info("Waiting for block number", "blockNumber", blockNumber)
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)

select {
case <-ctx.Done():
return ctx.Err()
case outChan <- block:
}
// Send block
select {
case <-ctx.Done():
fmt.Println("exiting consume ctx err")
return ctx.Err()
case blocks <- block:
}

distanceFromLatest := latestBlockNumber - block.BlockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", block.BlockNumber,
"latestBlockNumber", latestBlockNumber,
"distanceFromLatest", distanceFromLatest,
"getBlockElapsedMillis", getBlockElapsed.Milliseconds(),
"elapsedMillis", time.Since(startTime).Milliseconds(),
)
distanceFromLatest := i.info.LatestBlockNumber - block.BlockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", block.BlockNumber,
"latestBlockNumber", i.info.LatestBlockNumber,
"distanceFromLatest", distanceFromLatest,
"getBlockElapsedMillis", getBlockElapsed.Milliseconds(),
"elapsedMillis", time.Since(startTime).Milliseconds(),
)
}
i.log.Info("Waiting for block number", "blockNumber", blockNumber)
}
}
return ErrFinishedConsumeBlocks // FIXME: this is wrong
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for {
fmt.Println("Hello")
select {
case <-ctx.Done():
case _, ok := <-ctx.Done():
fmt.Println("ctx done, exiting send", ok)
return nil // context canceled
case payload, ok := <-blocksCh:
// TODO: we should batch RCP blocks here before sending to Dune.
// TODO: we should batch RPC blocks here before sending to Dune.
if !ok {
fmt.Println("sendblocks: channel closedd")
fmt.Println("exiting send")
return nil // channel closed
}
if err := i.dune.SendBlock(ctx, payload); err != nil {
fmt.Println("sending block", payload.BlockNumber)
err := i.dune.SendBlock(ctx, payload)
if err != nil {
fmt.Println("got block err")
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
Expand All @@ -144,6 +179,7 @@
} else {
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
}
fmt.Printf("got result %v\n", err)

Check failure on line 182 in ingester/mainloop.go

View workflow job for this annotation

GitHub Actions / Lint and test

use of `fmt.Printf` forbidden by pattern `^(fmt\.Print(|f|ln)|print|println)$` (forbidigo)
}
}
}
Expand All @@ -167,8 +203,13 @@
previousIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)

for {
if ctx.Err() != nil {
fmt.Println("exiting report progress ctx err")
return ctx.Err()
}
select {
case <-ctx.Done():
case _, ok := <-ctx.Done():
fmt.Println("exiting report progress", ok)
return nil
case tNow := <-timer.C:
latest := atomic.LoadInt64(&i.info.LatestBlockNumber)
Expand Down
23 changes: 16 additions & 7 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package ingester_test

import (
"context"
"io"
"fmt"
"log/slog"
"os"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -47,6 +48,7 @@ func TestBlockConsumptionLoopErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(100)
producedBlockNumber := int64(0)

rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
if tc.LatestIsBroken {
Expand All @@ -69,15 +71,20 @@ func TestBlockConsumptionLoopErrors(t *testing.T) {
}, nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{
ing := ingester.New(slog.New(slog.NewTextHandler(os.Stderr, nil)), rpcClient, nil, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

inCh := make(chan int64, maxBlockNumber+1)
go ing.ProduceBlockNumbers(ctx, inCh, 1, maxBlockNumber)

fmt.Println("hello")

outCh := make(chan models.RPCBlock, maxBlockNumber+1)
defer close(outCh)
err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber)
require.Error(t, err) // this is expected
err := ing.ConsumeBlocks(ctx, inCh, outCh)
require.Nil(t, err) // this is expected
if tc.BlockByNumberIsBroken {
require.Equal(t, producedBlockNumber, int64(0))
}
Expand Down Expand Up @@ -128,11 +135,13 @@ func TestRunLoopBaseCase(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
ing := ingester.New(slog.New(slog.NewTextHandler(os.Stderr, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

// TODO: Cancel when done, this deadlocks in ReportProgress

err := ing.Run(context.Background(), 0, tc.i)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, tc.i)
Expand All @@ -149,6 +158,7 @@ func TestRunLoopUntilCancel(t *testing.T) {
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
fmt.Println(block.BlockNumber, maxBlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
cancel()
Expand All @@ -168,11 +178,10 @@ func TestRunLoopUntilCancel(t *testing.T) {
}, nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
ing := ingester.New(slog.New(slog.NewTextHandler(os.Stderr, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

err := ing.Run(ctx, 0, maxBlockNumber)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, maxBlockNumber)
Expand Down
Loading