-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fetch blocks concurrently but send in order #32
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
fb06900
to
0b18551
Compare
04ee907
to
aa11815
Compare
|
||
// Send this block only if we have sent all previous blocks | ||
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] { | ||
i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be an isolated function:
next = trySendCompletedBlocks(blockMap, next, ...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah good call!
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { | ||
i.log.Info("SendBlocks: Starting to receive blocks") | ||
blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order | ||
next := startBlockNumber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextNumberToSend ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, agreed
i.log.Info("Received block", "blockNumber", block.BlockNumber) | ||
|
||
// Send this block only if we have sent all previous blocks | ||
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is smart for loop :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChatGPT showed me this 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really cool, but we definitely need TESTS to prove it is correct.
ingester/mainloop_test.go
Outdated
// logOutput := os.Stderr | ||
logOutput := io.Discard | ||
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ | ||
MaxBatchSize: 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need a test with batchSize > 2 and where we assert that on duneclient (mock) we never receive blocks out of order...
(and the mocked rpcClient takes some random microsecond times on BlockByNumber
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, good catch that batch size is only 1 here, oops
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test doesn't use the RPC client at all, though, we are sending blocks out of order to the SendBlock goroutine.
But you're probably asking for an integration test with that scenario, though! I'll add it
aa11815
to
b0975b0
Compare
sentBlockNumber := int64(0) | ||
producedBlockNumber := int64(0) | ||
duneapi := &duneapi_mock.BlockchainIngesterMock{ | ||
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { | ||
// DuneAPI must fail if it receives blocks out of order | ||
if block.BlockNumber != sentBlockNumber+1 { | ||
return errors.Errorf("blocks out of order") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can simply require.Equal()
to abort the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SHIP IT
This PR changes the node indexer from sending one block at a time to sending a batch of blocks. Earlier we implemented concurrent block fetching with buffering (#32). On a configurable interval (defaults to every second), we now check the buffer and send all possible blocks. We add a flag for specifying whether you wish to skip a failed block or not. It's off by default. This means if all the retries to the RPC node fails for a given block, we will crash. This ensures no block gaps. --------- Co-authored-by: Miguel Filipe <[email protected]>
This PR rearchitects the main loop such that we can run multiple goroutines to fetch blocks concurrently, but send them in order to the API to ensure we don't get gaps. We produce block numbers to fetch on an unbuffered channel (
ProduceBlockNumbers
), and each concurrentConsumeBlock
goroutine gets a block number from that channel. TheSendBlocks
goroutine receives all blocks on an unbuffered channel, but buffers them in a map until they can be sent in order.I deleted the test that asserted the exact number of blocks that were sent. I couldn't manage to get it working without off by one errors sometimes. Since the main use case is running it forever and not with a given blocks, I figured this was an OK decision. Let me know.
Possibly problematic scenario: If block N - and only block N - is somehow problematic from the RPC node, and must be retried ~forever, what will happen? MaxBatchSize - 1 goroutines continue to fetch blocks from the RPC node successfully, causing the buffer in SendBlocks to fill up. The mechanism in this PR doesn't have a way to backpressure there. Is that a problem? If memory footprint is a concern, we could possibly compress it here: At this point, currently, the block response is uncompressed. If we compress it first, it would take a lot more blocks for this to consume a lot of memory.
Otherwise, if there are general problems with the RPC node such that all requests take a long time or need to be retried, we'll get backpressured because we limit the concurrent number of calls to the node to MaxBatchSize.
FWIW I'm running this locally and getting 12 blocks per second with 5 workers!