Skip to content

Commit

Permalink
Handle context cancellation (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 6, 2024
1 parent 423f955 commit 732e823
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 7 deletions.
5 changes: 3 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ func main() {
},
)

ctx, cancelFn := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
err := ingester.Run(context.Background(), cfg.BlockHeight, 0 /* maxCount */)
err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */)
logger.Info("Ingester finished", "err", err)
}()

// TODO: add a metrics exporter or healthcheck http endpoint ?

_, cancelFn := context.WithCancel(context.Background())
quit := make(chan os.Signal, 1)
// handle Interrupt (ctrl-c) Term, used by `kill` et al, HUP which is commonly used to reload configs
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll
BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll
Dune DuneClient
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500millis"` // nolint:lll
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500ms"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
}
Expand Down
14 changes: 11 additions & 3 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ func (i *ingester) ConsumeBlocks(
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(blockNumber, latestBlockNumber int64) int64 {
// TODO: handle cancellation here
for blockNumber > latestBlockNumber {
select {
case <-ctx.Done():
return latestBlockNumber
default:
}
i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
Expand All @@ -70,7 +74,6 @@ func (i *ingester) ConsumeBlocks(
}

for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {

latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber)
startTime := time.Now()

Expand All @@ -86,6 +89,11 @@ func (i *ingester) ConsumeBlocks(
BlockNumber: blockNumber,
Error: err,
})

if errors.Is(err, context.Canceled) {
return err
}

// TODO: should I sleep (backoff) here?
continue
}
Expand Down Expand Up @@ -118,7 +126,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil // context canceled
case payload, ok := <-blocksCh:
// TODO: we should batch RCP blocks here before sending to Dune.
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestRunLoopUntilCancel(t *testing.T) {
})

err := ing.Run(ctx, 0, maxBlockNumber)
require.ErrorIs(t, err, context.Canceled)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, maxBlockNumber)
require.Equal(t, sentBlockNumber, maxBlockNumber)
}

0 comments on commit 732e823

Please sign in to comment.