diff --git a/client/duneapi/client.go b/client/duneapi/client.go index a7dc13f..c2af899 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -129,7 +129,7 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques req.Header.Set("Content-Encoding", request.ContentEncoding) } req.Header.Set("Content-Type", "application/x-ndjson") - req.Header.Set("x-idempotency-key", request.IdempotencyKey) + req.Header.Set("idempotency-key", request.IdempotencyKey) req.Header.Set("x-dune-evm-stack", request.EVMStack) req.Header.Set("x-dune-api-key", c.cfg.APIKey) req = req.WithContext(ctx) diff --git a/ingester/ingester.go b/ingester/ingester.go index 32de024..6300057 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -14,10 +14,13 @@ type Ingester interface { // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested Run(ctx context.Context, startBlockNumber, maxCount int64) error + // TODO + ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber, endBlockNumber int64) error + // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain // it will run continuously until the context is cancelled - ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error + ConsumeBlocks(ctx context.Context, inChan chan int64, outChan chan models.RPCBlock) error // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop // it will block until: diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 79d0b2d..321d33e 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -11,18 +11,20 @@ import ( "golang.org/x/sync/errgroup" ) -func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error { +func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) - defer close(inFlightChan) - var err error + blockChan := make(chan int64, i.cfg.MaxBatchSize) - if startBlockNumber < 0 { - startBlockNumber, err = i.node.LatestBlockNumber() - if err != nil { - return errors.Errorf("failed to get latest block number: %w", err) - } - } + // var err error + + // TODO: Use progress endpoint + // TODO: Fetch blocks in parallel + + i.tryUpdateLatestBlockNumber() + i.log.Info("Got latest block number", "latestBlockNumber", i.info.LatestBlockNumber) + + // endBlockNumber := startBlockNumber+maxCount i.log.Info("Starting ingester", "maxBatchSize", i.cfg.MaxBatchSize, @@ -32,7 +34,12 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er errGroup, ctx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount) + defer close(blockChan) + return i.ProduceBlockNumbers(ctx, blockChan, startBlockNumber, startBlockNumber+maxCount) + }) + errGroup.Go(func() error { + defer close(inFlightChan) + return i.ConsumeBlocks(ctx, blockChan, inFlightChan) }) errGroup.Go(func() error { return i.SendBlocks(ctx, inFlightChan) @@ -41,98 +48,126 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er return i.ReportProgress(ctx) }) + fmt.Println("waiting") if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks { + fmt.Println("errro in wait") return err } + fmt.Println("exiting run") return nil } -var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") +// ProduceBlockNumbers on a channel +func (i *ingester) ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber, endBlockNumber int64) error { + blockNumber := startBlockNumber -// ConsumeBlocks from the NPC Node -func (i *ingester) ConsumeBlocks( - ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, -) error { - dontStop := endBlockNumber <= startBlockNumber - latestBlockNumber := i.tryUpdateLatestBlockNumber() + fmt.Println("starting produce", startBlockNumber, endBlockNumber) - waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 { - for blockNumber > latestBlockNumber { - select { - case <-ctx.Done(): - return latestBlockNumber - case <-time.After(i.cfg.PollInterval): - } - i.log.Debug(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), - "blockNumber", blockNumber, - "latestBlockNumber", latestBlockNumber, - ) - latestBlockNumber = i.tryUpdateLatestBlockNumber() + // TODO: Update latest block number if we're caught up + + for blockNumber < i.info.LatestBlockNumber && blockNumber <= endBlockNumber { + // i.log.Info("Attempting to produce block number", "blockNumber", blockNumber) + fmt.Println("lol") + select { + case <-ctx.Done(): + return nil + case outChan <- blockNumber: + i.log.Info("Produced block number", "blockNumber", blockNumber) + blockNumber++ } - return latestBlockNumber } + fmt.Println("exiting produce") + return nil +} - for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { - latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber) - startTime := time.Now() +var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") - block, err := i.node.BlockByNumber(ctx, blockNumber) - if err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("Context canceled, stopping..") - return err +// ConsumeBlocks from the NPC Node +func (i *ingester) ConsumeBlocks( + ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock, +) error { + for { + select { + case <-ctx.Done(): + return nil // context canceled + case blockNumber, ok := <-blockNumbers: + fmt.Println("got block number", blockNumber) + // TODO: we should batch RPC blocks here before sending to Dune. + if !ok { + fmt.Println("channel was closed") + fmt.Println("exiting consume") + return nil // channel closed } + i.log.Info("Got block number", "blockNumber", blockNumber) + startTime := time.Now() + block, err := i.node.BlockByNumber(ctx, blockNumber) + if err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("Context canceled, stopping..") + fmt.Println("exiting consume") + return err + } + + i.log.Error("Failed to get block by number, continuing..", + "blockNumber", blockNumber, + "latestBlockNumber", i.info.LatestBlockNumber, + "error", err, + ) + i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: blockNumber, + Error: err, + }) - i.log.Error("Failed to get block by number, continuing..", - "blockNumber", blockNumber, - "latestBlockNumber", latestBlockNumber, - "error", err, - ) - i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: blockNumber, - Error: err, - }) - - // TODO: should I sleep (backoff) here? - continue - } + i.log.Info("Waiting for block number", "blockNumber", blockNumber) + continue + } - atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) - getBlockElapsed := time.Since(startTime) + atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) + getBlockElapsed := time.Since(startTime) - select { - case <-ctx.Done(): - return ctx.Err() - case outChan <- block: - } + // Send block + select { + case <-ctx.Done(): + fmt.Println("exiting consume ctx err") + return ctx.Err() + case blocks <- block: + } - distanceFromLatest := latestBlockNumber - block.BlockNumber - if distanceFromLatest > 0 { - // TODO: improve logs of processing speed and catchup estimated ETA - i.log.Info("We're behind, trying to catch up..", - "blockNumber", block.BlockNumber, - "latestBlockNumber", latestBlockNumber, - "distanceFromLatest", distanceFromLatest, - "getBlockElapsedMillis", getBlockElapsed.Milliseconds(), - "elapsedMillis", time.Since(startTime).Milliseconds(), - ) + distanceFromLatest := i.info.LatestBlockNumber - block.BlockNumber + if distanceFromLatest > 0 { + // TODO: improve logs of processing speed and catchup estimated ETA + i.log.Info("We're behind, trying to catch up..", + "blockNumber", block.BlockNumber, + "latestBlockNumber", i.info.LatestBlockNumber, + "distanceFromLatest", distanceFromLatest, + "getBlockElapsedMillis", getBlockElapsed.Milliseconds(), + "elapsedMillis", time.Since(startTime).Milliseconds(), + ) + } + i.log.Info("Waiting for block number", "blockNumber", blockNumber) } } - return ErrFinishedConsumeBlocks // FIXME: this is wrong } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { for { + fmt.Println("Hello") select { - case <-ctx.Done(): + case _, ok := <-ctx.Done(): + fmt.Println("ctx done, exiting send", ok) return nil // context canceled case payload, ok := <-blocksCh: - // TODO: we should batch RCP blocks here before sending to Dune. + // TODO: we should batch RPC blocks here before sending to Dune. if !ok { + fmt.Println("sendblocks: channel closedd") + fmt.Println("exiting send") return nil // channel closed } - if err := i.dune.SendBlock(ctx, payload); err != nil { + fmt.Println("sending block", payload.BlockNumber) + err := i.dune.SendBlock(ctx, payload) + if err != nil { + fmt.Println("got block err") // TODO: implement DeadLetterQueue // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) @@ -144,6 +179,7 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo } else { atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber) } + fmt.Printf("got result %v\n", err) } } } @@ -167,8 +203,13 @@ func (i *ingester) ReportProgress(ctx context.Context) error { previousIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber) for { + if ctx.Err() != nil { + fmt.Println("exiting report progress ctx err") + return ctx.Err() + } select { - case <-ctx.Done(): + case _, ok := <-ctx.Done(): + fmt.Println("exiting report progress", ok) return nil case tNow := <-timer.C: latest := atomic.LoadInt64(&i.info.LatestBlockNumber) diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 3b03d94..8bc139c 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -2,8 +2,9 @@ package ingester_test import ( "context" - "io" + "fmt" "log/slog" + "os" "sync/atomic" "testing" "time" @@ -47,6 +48,7 @@ func TestBlockConsumptionLoopErrors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(100) producedBlockNumber := int64(0) + rpcClient := &jsonrpc_mock.BlockchainClientMock{ LatestBlockNumberFunc: func() (int64, error) { if tc.LatestIsBroken { @@ -69,15 +71,20 @@ func TestBlockConsumptionLoopErrors(t *testing.T) { }, nil }, } - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{ + ing := ingester.New(slog.New(slog.NewTextHandler(os.Stderr, nil)), rpcClient, nil, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) + inCh := make(chan int64, maxBlockNumber+1) + go ing.ProduceBlockNumbers(ctx, inCh, 1, maxBlockNumber) + + fmt.Println("hello") + outCh := make(chan models.RPCBlock, maxBlockNumber+1) defer close(outCh) - err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber) - require.Error(t, err) // this is expected + err := ing.ConsumeBlocks(ctx, inCh, outCh) + require.Nil(t, err) // this is expected if tc.BlockByNumberIsBroken { require.Equal(t, producedBlockNumber, int64(0)) } @@ -128,11 +135,13 @@ func TestRunLoopBaseCase(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + ing := ingester.New(slog.New(slog.NewTextHandler(os.Stderr, nil)), rpcClient, duneapi, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) + // TODO: Cancel when done, this deadlocks in ReportProgress + err := ing.Run(context.Background(), 0, tc.i) require.NoError(t, err) require.Equal(t, producedBlockNumber, tc.i) @@ -149,6 +158,7 @@ func TestRunLoopUntilCancel(t *testing.T) { duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + fmt.Println(block.BlockNumber, maxBlockNumber) if block.BlockNumber == maxBlockNumber { // cancel execution when we send the last block cancel() @@ -168,11 +178,10 @@ func TestRunLoopUntilCancel(t *testing.T) { }, nil }, } - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + ing := ingester.New(slog.New(slog.NewTextHandler(os.Stderr, nil)), rpcClient, duneapi, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) - err := ing.Run(ctx, 0, maxBlockNumber) require.NoError(t, err) require.Equal(t, producedBlockNumber, maxBlockNumber)