From b14bdcfb50e9592a9f8318711ce198bacf6dbd1f Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Thu, 6 Jun 2024 13:27:13 +0200 Subject: [PATCH] Handle context cancellation --- cmd/main.go | 5 +++-- config/config.go | 2 +- ingester/mainloop.go | 14 +++++++++++--- ingester/mainloop_test.go | 2 +- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ec97a66..e74c992 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -69,16 +69,17 @@ func main() { }, ) + ctx, cancelFn := context.WithCancel(context.Background()) + wg.Add(1) go func() { defer wg.Done() - err := ingester.Run(context.Background(), cfg.BlockHeight, 0 /* maxCount */) + err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */) logger.Info("Ingester finished", "err", err) }() // TODO: add a metrics exporter or healthcheck http endpoint ? - _, cancelFn := context.WithCancel(context.Background()) quit := make(chan os.Signal, 1) // handle Interrupt (ctrl-c) Term, used by `kill` et al, HUP which is commonly used to reload configs signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) diff --git a/config/config.go b/config/config.go index 2993adb..9ecd089 100644 --- a/config/config.go +++ b/config/config.go @@ -35,7 +35,7 @@ type Config struct { BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll Dune DuneClient - PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500millis"` // nolint:lll + PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500ms"` // nolint:lll RPCNode RPCClient RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll } diff --git a/ingester/mainloop.go b/ingester/mainloop.go index fe6aea1..04d788b 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -57,8 +57,12 @@ func (i *ingester) ConsumeBlocks( latestBlockNumber := i.tryUpdateLatestBlockNumber() waitForBlock := func(blockNumber, latestBlockNumber int64) int64 { - // TODO: handle cancellation here for blockNumber > latestBlockNumber { + select { + case <-ctx.Done(): + return latestBlockNumber + default: + } i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber, @@ -70,7 +74,6 @@ func (i *ingester) ConsumeBlocks( } for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { - latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber) startTime := time.Now() @@ -86,6 +89,11 @@ func (i *ingester) ConsumeBlocks( BlockNumber: blockNumber, Error: err, }) + + if errors.Is(err, context.Canceled) { + return err + } + // TODO: should I sleep (backoff) here? continue } @@ -118,7 +126,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo for { select { case <-ctx.Done(): - return ctx.Err() + return nil // context canceled case payload, ok := <-blocksCh: // TODO: we should batch RCP blocks here before sending to Dune. if !ok { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 3d8fc96..27496ab 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -174,7 +174,7 @@ func TestRunLoopUntilCancel(t *testing.T) { }) err := ing.Run(ctx, 0, maxBlockNumber) - require.ErrorIs(t, err, context.Canceled) + require.NoError(t, err) require.Equal(t, producedBlockNumber, maxBlockNumber) require.Equal(t, sentBlockNumber, maxBlockNumber) }