Skip to content

Commit

Permalink
mainloop basic tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
msf committed Jun 6, 2024
1 parent f837488 commit 055d88b
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 23 deletions.
10 changes: 7 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
PollInterval: cfg.PollInterval,
StartBlockHeight: cfg.BlockHeight,
PollInterval: cfg.PollInterval,
MaxBatchSize: 1,
},
)

wg.Add(1)
ingester.Run(context.Background(), &wg)
go func() {
defer wg.Done()
err := ingester.Run(context.Background(), cfg.BlockHeight, 0 /* maxCount */)
logger.Info("Ingester finished", "err", err)
}()

// TODO: add a metrics exporter or healthcheck http endpoint ?

Expand Down
9 changes: 4 additions & 5 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"context"
"log/slog"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
Expand All @@ -12,7 +11,8 @@ import (
)

type Ingester interface {
Run(ctx context.Context, wg *sync.WaitGroup) error
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
Expand All @@ -33,9 +33,8 @@ type Ingester interface {
const defaultMaxBatchSize = 1

type Config struct {
MaxBatchSize int
StartBlockHeight int64
PollInterval time.Duration
MaxBatchSize int
PollInterval time.Duration
}

type Info struct {
Expand Down
24 changes: 13 additions & 11 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -12,30 +11,28 @@ import (
"golang.org/x/sync/errgroup"
)

func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()

func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error {
inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize)
defer close(inFlightChan)

var err error

startBlockNumber := i.cfg.StartBlockHeight
if startBlockNumber <= 0 {
if startBlockNumber < 0 {
startBlockNumber, err = i.node.LatestBlockNumber()
if err != nil {
return errors.Errorf("failed to get latest block number: %w", err)
}
}

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockHeight", i.cfg.StartBlockHeight,
"startBlockNumber", startBlockNumber,
"maxCount", maxCount,
)

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, -1)
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount)
})
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
Expand All @@ -44,9 +41,14 @@ func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error {
return i.ReportProgress(ctx)
})

return errGroup.Wait()
if err := errGroup.Wait(); err != nil && err != errFinishedConsumeBlocks {
return err
}
return nil
}

var errFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
Expand All @@ -66,7 +68,7 @@ func (i *ingester) ConsumeBlocks(
return latestBlockNumber
}

for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {
for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {

latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber)
startTime := time.Now()
Expand Down Expand Up @@ -108,7 +110,7 @@ func (i *ingester) ConsumeBlocks(
)
}
}
return nil
return errFinishedConsumeBlocks
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
Expand Down
100 changes: 96 additions & 4 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
package ingester_test

import "testing"
import (
"context"
"io"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/duneanalytics/blockchain-ingester/ingester"
duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi"
jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/stretchr/testify/require"
)

func TestBlockConsumptionLoop(t *testing.T) {
testcases := []string{
"we're very behind, trying to catch up",
"we're up to date, following the head",
"we're erroring systematically, the RPC node is broken, all API calls are failing",
"we're erroring only on GetBlockByNumber, a specific jsonRPC on the RPC node is broken",
Expand All @@ -30,6 +43,85 @@ func TestBlockSendingLoop(t *testing.T) {
}
}

func TestRunLoopHappyCase(t *testing.T) {
t.Skip("not implemented")
func TestRunLoopBaseCase(t *testing.T) {
testCases := []struct {
name string
i int64
}{
{name: "1 block", i: 1},
{name: "100 blocks", i: 100},
}
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return 1000, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: []byte(`block`),
}, nil
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

var wg sync.WaitGroup
wg.Add(1)
err := ing.Run(context.Background(), 0, tc.i)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, tc.i)
require.Equal(t, sentBlockNumber, tc.i)
})
}
}

func TestRunLoopUntilCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
cancel()
}
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return maxBlockNumber + 1, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: []byte(`block`),
}, nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

err := ing.Run(ctx, 0, maxBlockNumber)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, maxBlockNumber)
require.Equal(t, sentBlockNumber, maxBlockNumber)
}

0 comments on commit 055d88b

Please sign in to comment.