Skip to content

Commit

Permalink
Move catchup up to listener level, use RPC client and add retries
Browse files Browse the repository at this point in the history
  • Loading branch information
iansuvak committed Aug 21, 2024
1 parent 5e4e2e9 commit 495cd08
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 22 deletions.
95 changes: 95 additions & 0 deletions relayer/catchup_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package relayer

import (
"context"
"math/big"
"time"

"go.uber.org/zap"
)

const (
MaxBlocksPerRequest = 200
// Max buffer size for subscription channels
maxClientSubscriptionBuffer = 20000
// Max number of retries for RPC calls before failing catchup
maxRpcRetries = 5
)

func (lstnr *Listener) processFromHeight(height *big.Int, done chan bool) {
defer close(done)
lstnr.logger.Info(
"Processing historical logs",
zap.String("fromBlockHeight", height.String()),
zap.String("blockchainID", lstnr.sourceBlockchain.BlockchainID),
)
if height == nil {
lstnr.logger.Error("Cannot process logs from nil height")
done <- false
return
}

// Grab the latest block before filtering logs so we don't miss any before updating the db
latestBlockHeight, err := lstnr.ethClient.BlockNumber(context.Background())
if err != nil {
lstnr.logger.Error(
"Failed to get latest block",
zap.String("blockchainID", lstnr.sourceBlockchain.BlockchainID),
zap.Error(err),
)
done <- false
return
}

bigLatestBlockHeight := big.NewInt(0).SetUint64(latestBlockHeight)

//nolint:lll
for fromBlock := big.NewInt(0).Set(height); fromBlock.Cmp(bigLatestBlockHeight) <= 0; fromBlock.Add(fromBlock, big.NewInt(MaxBlocksPerRequest)) {
toBlock := big.NewInt(0).Add(fromBlock, big.NewInt(MaxBlocksPerRequest-1))

// clamp to latest known block because we've already subscribed
// to new blocks and we don't want to double-process any blocks
// created after that subscription but before the determination
// of this "latest"
if toBlock.Cmp(bigLatestBlockHeight) > 0 {
toBlock.Set(bigLatestBlockHeight)
}

for i := 0; i < maxRpcRetries; i++ {
err = lstnr.processBlockRange(fromBlock, toBlock)
if err == nil {
break
}
lstnr.logger.Warn("Failed to process block range", zap.Error(err), zap.Int("try", i))
if i == maxRpcRetries-1 {
lstnr.logger.Error("Failed to process block range after max retries", zap.Error(err))
return
}
// Sleep before retrying with increasing backoff
time.Sleep(time.Duration(1+i) * time.Second)
}
}
done <- true
}

// Process Warp messages from the block range [fromBlock, toBlock], inclusive
func (lstnr *Listener) processBlockRange(
fromBlock, toBlock *big.Int,
) error {
for i := fromBlock.Int64(); i <= toBlock.Int64(); i++ {
header, err := lstnr.ethClient.HeaderByNumber(context.Background(), big.NewInt(i))
if err != nil {
lstnr.logger.Error(
"Failed to get header by number",
zap.String("blockchainID", lstnr.sourceBlockchain.BlockchainID),
zap.Error(err),
)
return err
}
lstnr.headersChan <- header
}
return nil
}
12 changes: 8 additions & 4 deletions relayer/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ava-labs/awm-relayer/relayer/config"
"github.com/ava-labs/awm-relayer/utils"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/subnet-evm/core/types"
"github.com/ava-labs/subnet-evm/ethclient"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand All @@ -37,6 +38,7 @@ type Listener struct {
healthStatus *atomic.Bool
ethClient ethclient.Client
messageCoordinator *MessageCoordinator
headersChan chan *types.Header
}

// runListener creates a Listener instance and the ApplicationRelayers for a subnet.
Expand Down Expand Up @@ -91,6 +93,7 @@ func newListener(
)
return nil, err
}
headersChan := make(chan *types.Header, maxClientSubscriptionBuffer)

ethWSClient, err := utils.NewEthClientWithConfig(
ctx,
Expand All @@ -106,7 +109,7 @@ func newListener(
)
return nil, err
}
sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient)
sub := vms.NewSubscriber(logger, config.ParseVM(sourceBlockchain.VM), blockchainID, ethWSClient, headersChan)

// Marks when the listener has finished the catch-up process on startup.
// Until that time, we do not know the order in which messages are processed,
Expand All @@ -133,6 +136,7 @@ func newListener(
healthStatus: relayerHealth,
ethClient: ethRPCClient,
messageCoordinator: messageCoordinator,
headersChan: headersChan,
}

// Open the subscription. We must do this before processing any missed messages, otherwise we may
Expand All @@ -148,8 +152,8 @@ func newListener(

// Process historical blocks in a separate goroutine so that the main processing loop can
// start processing new blocks as soon as possible. Otherwise, it's possible for
// ProcessFromHeight to overload the message queue and cause a deadlock.
go sub.ProcessFromHeight(big.NewInt(0).SetUint64(startingHeight), lstnr.catchUpResultChan)
// processFromHeight to overload the message queue and cause a deadlock.
go lstnr.processFromHeight(big.NewInt(0).SetUint64(startingHeight), catchUpResultChan)

return &lstnr, nil
}
Expand Down Expand Up @@ -191,7 +195,7 @@ func (lstnr *Listener) processLogs(ctx context.Context) error {
)
return fmt.Errorf("failed to catch up on historical blocks")
}
case blockHeader := <-lstnr.Subscriber.Headers():
case blockHeader := <-lstnr.headersChan:
go lstnr.messageCoordinator.ProcessBlock(blockHeader, lstnr.ethClient, errChan)
case err := <-lstnr.Subscriber.Err():
lstnr.healthStatus.Store(false)
Expand Down
15 changes: 9 additions & 6 deletions vms/evm/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
)

const (
// Max buffer size for ethereum subscription channels
maxClientSubscriptionBuffer = 20000
subscribeRetryTimeout = 1 * time.Second
MaxBlocksPerRequest = 200
subscribeRetryTimeout = 1 * time.Second
MaxBlocksPerRequest = 200
)

// subscriber implements Subscriber
Expand All @@ -35,12 +33,17 @@ type subscriber struct {
}

// NewSubscriber returns a subscriber
func NewSubscriber(logger logging.Logger, blockchainID ids.ID, ethClient ethclient.Client) *subscriber {
func NewSubscriber(
logger logging.Logger,
blockchainID ids.ID,
ethClient ethclient.Client,
headersChan chan *types.Header,
) *subscriber {
return &subscriber{
blockchainID: blockchainID,
ethClient: ethClient,
logger: logger,
headers: make(chan *types.Header, maxClientSubscriptionBuffer),
headers: headersChan,
}
}

Expand Down
3 changes: 2 additions & 1 deletion vms/evm/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func makeSubscriberWithMockEthClient(t *testing.T) (*subscriber, *mock_ethclient
mockEthClient := mock_ethclient.NewMockClient(gomock.NewController(t))
blockchainID, err := ids.FromString(sourceSubnet.BlockchainID)
require.NoError(t, err)
subscriber := NewSubscriber(logger, blockchainID, mockEthClient)
headersChan := make(chan *types.Header, 200)
subscriber := NewSubscriber(logger, blockchainID, mockEthClient, headersChan)

return subscriber, mockEthClient
}
Expand Down
19 changes: 8 additions & 11 deletions vms/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package vms

import (
"math/big"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/awm-relayer/relayer/config"
Expand All @@ -18,18 +16,11 @@ import (
// channel returned by Logs() are assumed to be in block order. Logs within individual blocks
// may be in any order.
type Subscriber interface {
// ProcessFromHeight processes events from {height} to the latest block.
// Writes true to the channel on success, false on failure
ProcessFromHeight(height *big.Int, done chan bool)

// Subscribe registers a subscription. After Subscribe is called,
// log events that match [filter] are written to the channel returned
// by Logs
Subscribe(maxResubscribeAttempts int) error

// Headers returns the channel that the subscription writes block headers to
Headers() <-chan *types.Header

// Err returns the channel that the subscription writes errors to
// If an error is sent to this channel, the subscription should be closed
Err() <-chan error
Expand All @@ -39,10 +30,16 @@ type Subscriber interface {
}

// NewSubscriber returns a concrete Subscriber according to the VM specified by [subnetInfo]
func NewSubscriber(logger logging.Logger, vm config.VM, blockchainID ids.ID, ethClient ethclient.Client) Subscriber {
func NewSubscriber(
logger logging.Logger,
vm config.VM,
blockchainID ids.ID,
ethClient ethclient.Client,
headersChan chan *types.Header,
) Subscriber {
switch vm {
case config.EVM:
return evm.NewSubscriber(logger, blockchainID, ethClient)
return evm.NewSubscriber(logger, blockchainID, ethClient, headersChan)
default:
return nil
}
Expand Down

0 comments on commit 495cd08

Please sign in to comment.