From 55e8daeba3660eba8a72a71e9f56ca38c7c24459 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Mon, 1 Jul 2024 10:22:49 +0200 Subject: [PATCH] Always send a batch when possible, even if it's small --- ingester/mainloop_test.go | 70 --------------------------------------- ingester/send.go | 4 --- 2 files changed, 74 deletions(-) diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 8700035..d65e6a4 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -190,76 +190,6 @@ func TestSendBlocks(t *testing.T) { require.Equal(t, int64(10), sentBlockNumber) } -func TestRunBlocksUseBatching(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - maxBlockNumber := int64(1000) - sentBlockNumber := int64(0) - producedBlockNumber := int64(0) - duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) == 0 { - return nil - } - - // Fail if we're not sending a batch of blocks - require.Greater(t, len(blocks), 1) - - next := sentBlockNumber + 1 - for _, block := range blocks { - // We cannot send blocks out of order to DuneAPI - require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber) - next++ - } - - lastBlockNumber := blocks[len(blocks)-1].BlockNumber - atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) - if lastBlockNumber >= maxBlockNumber { - // cancel execution when we have sent the last block - cancel() - return context.Canceled - } - - return nil - }, - PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { - return nil - }, - } - rpcClient := &jsonrpc_mock.BlockchainClientMock{ - LatestBlockNumberFunc: func() (int64, error) { - return maxBlockNumber + 1, nil - }, - BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { - // Get blocks out of order by sleeping for a random amount of time - atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil - }, - CloseFunc: func() error { - return nil - }, - } - // Swap these to see logs - // logOutput := os.Stderr - logOutput := io.Discard - ing := ingester.New( - slog.New(slog.NewTextHandler(logOutput, nil)), - rpcClient, - duneapi, - ingester.Config{ - MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines - // big enough compared to the time spent in block by number to ensure batching. We panic - // in the mocked Dune client if we don't get a batch of blocks (more than one block). - BlockSubmitInterval: 50 * time.Millisecond, - SkipFailedBlocks: false, - }, - nil, // progress - ) - - err := ing.Run(ctx, 1, -1) // run until canceled - require.ErrorIs(t, err, context.Canceled) // this is expected - require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) -} - // TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order // even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. func TestRunBlocksOutOfOrder(t *testing.T) { diff --git a/ingester/send.go b/ingester/send.go index 8d22df7..ca5f5f7 100644 --- a/ingester/send.go +++ b/ingester/send.go @@ -70,10 +70,6 @@ func (i *ingester) trySendCompletedBlocks( nextBlockToSend int64, ) (int64, error) { for { - if len(collectedBlocks) < maxBatchSize/10 { - // if we have very little to send, wait for next tick to avoid tiny batches impacting throughput - return nextBlockToSend, nil - } nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize) if err != nil || nextBlock == nextBlockToSend { return nextBlock, err