diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 7a00920..da1e611 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -184,17 +184,22 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch } else { c.log.Info("Sent progress report", "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "latestBlockNumber", request.LatestBlockNumber, "duration", time.Since(start), ) } }() + request = BlockchainProgress{ + LastIngestedBlockNumber: progress.LastIngestedBlockNumber, + LatestBlockNumber: progress.LatestBlockNumber, + } url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) - c.log.Debug("Sending request", "url", url) - payload, err := json.Marshal(progress) + payload, err := json.Marshal(request) if err != nil { return err } + c.log.Info("Sending request", "url", url, "payload", string(payload)) req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) if err != nil { return err diff --git a/cmd/main.go b/cmd/main.go index c93c203..74c7010 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -78,7 +78,7 @@ func main() { wg.Add(1) go func() { defer wg.Done() - err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */) + err := ingester.Run(ctx, 0 /* maxCount */) logger.Info("Ingester finished", "err", err) }() diff --git a/ingester/ingester.go b/ingester/ingester.go index e3f7bc7..efe7da2 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -12,7 +12,7 @@ import ( type Ingester interface { // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested - Run(ctx context.Context, startBlockNumber, maxCount int64) error + Run(ctx context.Context, maxCount int64) error // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 79d0b2d..0ede22e 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -11,28 +11,32 @@ import ( "golang.org/x/sync/errgroup" ) -func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error { +func (i *ingester) Run(ctx context.Context, maxCount int64) error { inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) defer close(inFlightChan) - var err error - - if startBlockNumber < 0 { - startBlockNumber, err = i.node.LatestBlockNumber() - if err != nil { - return errors.Errorf("failed to get latest block number: %w", err) - } + progress, err := i.dune.GetProgressReport(ctx) + if err != nil { + i.log.Error("failed to get progress report", "err", err) + return err } + // Start ingesting from where we left off + startBlockNumber := progress.LastIngestedBlockNumber + 1 + // Ingest until endBlockNumber, inclusive. If maxCount is 0, we ingest forever + endBlockNumber := startBlockNumber + maxCount - 1 + i.log.Info("Starting ingester", "maxBatchSize", i.cfg.MaxBatchSize, "startBlockNumber", startBlockNumber, + "endBlockNumber", endBlockNumber, + "latestBlockNumber", progress.LatestBlockNumber, "maxCount", maxCount, ) errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount) + return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber) }) errGroup.Go(func() error { return i.SendBlocks(ctx, inFlightChan) @@ -44,7 +48,8 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks { return err } - return nil + + return i.Close() } var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") @@ -53,10 +58,9 @@ var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") func (i *ingester) ConsumeBlocks( ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, ) error { - dontStop := endBlockNumber <= startBlockNumber latestBlockNumber := i.tryUpdateLatestBlockNumber() - waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 { + waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 { for blockNumber > latestBlockNumber { select { case <-ctx.Done(): @@ -72,6 +76,9 @@ func (i *ingester) ConsumeBlocks( return latestBlockNumber } + // Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0 + dontStop := endBlockNumber < startBlockNumber + for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber) startTime := time.Now() @@ -80,7 +87,7 @@ func (i *ingester) ConsumeBlocks( if err != nil { if errors.Is(err, context.Canceled) { i.log.Info("Context canceled, stopping..") - return err + return nil } i.log.Error("Failed to get block by number, continuing..", @@ -103,7 +110,7 @@ func (i *ingester) ConsumeBlocks( select { case <-ctx.Done(): - return ctx.Err() + return nil case outChan <- block: } @@ -119,7 +126,9 @@ func (i *ingester) ConsumeBlocks( ) } } - return ErrFinishedConsumeBlocks // FIXME: this is wrong + // Done consuming blocks, either because we reached the endBlockNumber or the context was canceled + i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber) + return nil } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { @@ -202,10 +211,39 @@ func (i *ingester) ReportProgress(ctx context.Context) error { previousIngested = lastIngested previousDistance = newDistance previousTime = tNow + + // TODO: include errors in the report, reset the error list + err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{ + BlockchainName: i.cfg.BlockchainName, + EVMStack: i.cfg.Stack.String(), + LastIngestedBlockNumber: lastIngested, + LatestBlockNumber: latest, + }) + if err != nil { + i.log.Error("Failed to post progress report", "error", err) + } } } } func (i *ingester) Close() error { + // Send a final progress report to flush progress + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + i.log.Info("Sending final progress report") + err := i.dune.PostProgressReport( + ctx, + models.BlockchainIndexProgress{ + BlockchainName: i.cfg.BlockchainName, + EVMStack: i.cfg.Stack.String(), + LastIngestedBlockNumber: i.info.IngestedBlockNumber, + LatestBlockNumber: i.info.LatestBlockNumber, + }) + i.log.Info("Closing node") + if err != nil { + _ = i.node.Close() + return err + } + return i.node.Close() } diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 3b03d94..164737c 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -68,8 +68,14 @@ func TestBlockConsumptionLoopErrors(t *testing.T) { Payload: []byte(`block`), }, nil }, + CloseFunc: func() error { + return nil + }, } - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{ + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, nil, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) @@ -77,7 +83,7 @@ func TestBlockConsumptionLoopErrors(t *testing.T) { outCh := make(chan models.RPCBlock, maxBlockNumber+1) defer close(outCh) err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber) - require.Error(t, err) // this is expected + require.NoError(t, err) if tc.BlockByNumberIsBroken { require.Equal(t, producedBlockNumber, int64(0)) } @@ -100,23 +106,21 @@ func TestBlockSendingLoop(t *testing.T) { func TestRunLoopBaseCase(t *testing.T) { testCases := []struct { - name string - i int64 + name string + maxCount int64 + lastIngested int64 }{ - {name: "1 block", i: 1}, - {name: "100 blocks", i: 100}, + {name: "1 block", maxCount: 1, lastIngested: 0}, + {name: "2 blocks", maxCount: 2, lastIngested: 0}, + {name: "100 blocks", maxCount: 100, lastIngested: 0}, + {name: "100 blocks, starting from 50", maxCount: 100, lastIngested: 50}, } sentBlockNumber := int64(0) producedBlockNumber := int64(0) - duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - return nil - }, - } + latestBlockNumber := int64(1000) rpcClient := &jsonrpc_mock.BlockchainClientMock{ LatestBlockNumberFunc: func() (int64, error) { - return 1000, nil + return latestBlockNumber, nil }, BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { atomic.StoreInt64(&producedBlockNumber, blockNumber) @@ -125,25 +129,55 @@ func TestRunLoopBaseCase(t *testing.T) { Payload: []byte(`block`), }, nil }, + CloseFunc: func() error { + return nil + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + return nil + }, + GetProgressReportFunc: func(_ context.Context) (*models.BlockchainIndexProgress, error) { + return &models.BlockchainIndexProgress{ + LatestBlockNumber: latestBlockNumber, + LastIngestedBlockNumber: tc.lastIngested, + }, nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) - err := ing.Run(context.Background(), 0, tc.i) + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel top level context (only for progress report goroutine) + go func() { + <-time.After(100 * time.Millisecond) + cancel() + }() + + err := ing.Run(ctx, tc.maxCount) require.NoError(t, err) - require.Equal(t, producedBlockNumber, tc.i) - require.Equal(t, sentBlockNumber, tc.i) + require.Equal(t, tc.lastIngested+tc.maxCount, producedBlockNumber) + require.Equal(t, tc.lastIngested+tc.maxCount, sentBlockNumber) }) } } func TestRunLoopUntilCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - maxBlockNumber := int64(1000) + maxBlockNumber := int64(10) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ @@ -155,6 +189,12 @@ func TestRunLoopUntilCancel(t *testing.T) { } return nil }, + GetProgressReportFunc: func(_ context.Context) (*models.BlockchainIndexProgress, error) { + return &models.BlockchainIndexProgress{LatestBlockNumber: producedBlockNumber}, nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, } rpcClient := &jsonrpc_mock.BlockchainClientMock{ LatestBlockNumberFunc: func() (int64, error) { @@ -167,13 +207,19 @@ func TestRunLoopUntilCancel(t *testing.T) { Payload: []byte(`block`), }, nil }, + CloseFunc: func() error { + return nil + }, } - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) - err := ing.Run(ctx, 0, maxBlockNumber) + err := ing.Run(ctx, maxBlockNumber) require.NoError(t, err) require.Equal(t, producedBlockNumber, maxBlockNumber) require.Equal(t, sentBlockNumber, maxBlockNumber)