Skip to content

Commit

Permalink
feat(relayer): Allow resync flag option to restart processing from bl…
Browse files Browse the repository at this point in the history
…ock 0 (#266)
  • Loading branch information
cyberhorsey authored Nov 15, 2022
1 parent 91d2434 commit 6b01cbe
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 33 deletions.
11 changes: 5 additions & 6 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ var (
}
)

// TODO: implement `resync` mode to wipe DB and restart from block 0
func Run(mode Mode, layer Layer) {
func Run(mode relayer.Mode, layer relayer.Layer) {
if err := loadAndValidateEnv(); err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -68,7 +67,7 @@ func Run(mode Mode, layer Layer) {

for _, i := range indexers {
go func(i *indexer.Service) {
if err := i.FilterThenSubscribe(context.Background()); err != nil {
if err := i.FilterThenSubscribe(context.Background(), mode); err != nil {
log.Fatal(err)
}
}(i)
Expand All @@ -77,7 +76,7 @@ func Run(mode Mode, layer Layer) {
<-forever
}

func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) {
func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), error) {
eventRepository, err := repo.NewEventRepository(db)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -110,7 +109,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)

indexers := make([]*indexer.Service, 0)

if layer == L1 || layer == Both {
if layer == relayer.L1 || layer == relayer.Both {
l1Indexer, err := indexer.NewService(indexer.NewServiceOpts{
EventRepo: eventRepository,
BlockRepo: blockRepository,
Expand All @@ -131,7 +130,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error)
indexers = append(indexers, l1Indexer)
}

if layer == L2 || layer == Both {
if layer == relayer.L2 || layer == relayer.Both {
l2Indexer, err := indexer.NewService(indexer.NewServiceOpts{
EventRepo: eventRepository,
BlockRepo: blockRepository,
Expand Down
10 changes: 5 additions & 5 deletions packages/relayer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

func main() {
modePtr := flag.String("mode", string(cli.SyncMode), `mode to run in.
modePtr := flag.String("mode", string(relayer.SyncMode), `mode to run in.
options:
sync: continue syncing from previous block
resync: restart syncing from block 0
fromBlock: restart syncing from specified block number
`)

layersPtr := flag.String("layers", string(cli.Both), `layers to watch and process.
layersPtr := flag.String("layers", string(relayer.Both), `layers to watch and process.
options:
l1: only watch l1 => l2 bridge messages
l2: only watch l2 => l1 bridge messages
Expand All @@ -25,13 +25,13 @@ func main() {

flag.Parse()

if !relayer.IsInSlice(cli.Mode(*modePtr), cli.Modes) {
if !relayer.IsInSlice(relayer.Mode(*modePtr), relayer.Modes) {
log.Fatal("mode not valid")
}

if !relayer.IsInSlice(cli.Layer(*layersPtr), cli.Layers) {
if !relayer.IsInSlice(relayer.Layer(*layersPtr), relayer.Layers) {
log.Fatal("mode not valid")
}

cli.Run(cli.Mode(*modePtr), cli.Layer(*layersPtr))
cli.Run(relayer.Mode(*modePtr), relayer.Layer(*layersPtr))
}
1 change: 1 addition & 0 deletions packages/relayer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ var (
ErrNoRPCClient = errors.Validation.NewWithKeyAndDetail("ERR_NO_RPC_CLIENT", "RPCClient is required")
ErrNoBridge = errors.Validation.NewWithKeyAndDetail("ERR_NO_BRIDGE", "Bridge is required")
ErrNoTaikoL2 = errors.Validation.NewWithKeyAndDetail("ERR_NO_TAIKO_L2", "TaikoL2 is required")
ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported")
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cli
package relayer

type Mode string

Expand Down
31 changes: 11 additions & 20 deletions packages/relayer/indexer/filter_then_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,34 @@ var (
// FilterThenSubscribe gets the most recent block height that has been indexed, and works it's way
// up to the latest block. As it goes, it tries to process messages.
// When it catches up, it then starts to Subscribe to latest events as they come in.
func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) error {
chainID, err := svc.ethClient.ChainID(ctx)
if err != nil {
return errors.Wrap(err, "s.ethClient.ChainID()")
return errors.Wrap(err, "svc.ethClient.ChainID()")
}

// get most recently processed block height from the DB
latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent(
eventName,
chainID,
)
if err != nil {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
if err := svc.setInitialProcessingBlockByMode(ctx, mode, chainID); err != nil {
return errors.Wrap(err, "svc.setInitialProcessingBlockByMode")
}

log.Infof("latest processed block: %v", latestProcessedBlock.Height)
log.Infof("processing from block height: %v", svc.processingBlock.Height)

if err != nil {
return errors.Wrap(err, "bridge.FilterMessageSent")
}

header, err := svc.ethClient.HeaderByNumber(ctx, nil)
if err != nil {
return errors.Wrap(err, "s.ethClient.HeaderByNumber")
return errors.Wrap(err, "svc.ethClient.HeaderByNumber")
}

// if we have already done the latest block, exit early
// TODO: call SubscribeMessageSent, as we can now just watch the chain for new blocks
if latestProcessedBlock.Height == header.Number.Uint64() {
if svc.processingBlock.Height == header.Number.Uint64() {
log.Info("caught up, subscribing to new incoming events")
return svc.subscribe(ctx, chainID)
}

const batchSize = 1000

svc.processingBlock = latestProcessedBlock

log.Infof("getting events between %v and %v in batches of %v",
svc.processingBlock.Height,
header.Number.Int64(),
Expand All @@ -65,7 +57,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
// todo: parallelize/concurrently catch up. don't think we need to do this in order.
// use WaitGroup.
// we get a timeout/EOF if we don't batch.
for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += batchSize {
for i := svc.processingBlock.Height; i < header.Number.Uint64(); i += batchSize {
var end uint64 = svc.processingBlock.Height + batchSize
// if the end of the batch is greater than the latest block number, set end
// to the latest block number
Expand All @@ -76,7 +68,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
log.Infof("batch from %v to %v", i, end)

events, err := svc.bridge.FilterMessageSent(&bind.FilterOpts{
Start: latestProcessedBlock.Height + uint64(1),
Start: svc.processingBlock.Height,
End: &end,
Context: ctx,
}, nil)
Expand Down Expand Up @@ -117,7 +109,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error {
}

if svc.processingBlock.Height < latestBlock.Number.Uint64() {
return svc.FilterThenSubscribe(ctx)
return svc.FilterThenSubscribe(ctx, relayer.SyncMode)
}

return svc.subscribe(ctx, chainID)
Expand Down Expand Up @@ -207,7 +199,6 @@ func (svc *Service) handleEvent(ctx context.Context, chainID *big.Int, event *co
// we can now consider that previous block processed. save it to the DB
// and bump the block number.
if raw.BlockNumber > svc.processingBlock.Height {
log.Info("raw blockNumber > processingBlock.height")
log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber)

if err := svc.blockRepo.Save(relayer.SaveBlockOpts{
Expand Down
10 changes: 9 additions & 1 deletion packages/relayer/indexer/service.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package indexer

import (
"context"
"crypto/ecdsa"
"math/big"

"github.com/cyberhorsey/errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
Expand All @@ -18,10 +21,15 @@ var (
ZeroAddress = common.HexToAddress("0x0000000000000000000000000000000000000000")
)

type ethClient interface {
ChainID(ctx context.Context) (*big.Int, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
}

type Service struct {
eventRepo relayer.EventRepository
blockRepo relayer.BlockRepository
ethClient *ethclient.Client
ethClient ethClient
destRPC *rpc.Client

processingBlock *relayer.Block
Expand Down
9 changes: 9 additions & 0 deletions packages/relayer/indexer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,22 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/taikochain/taiko-mono/packages/relayer"
"github.com/taikochain/taiko-mono/packages/relayer/mock"
"github.com/taikochain/taiko-mono/packages/relayer/repo"
"gopkg.in/go-playground/assert.v1"
)

var dummyEcdsaKey = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f"
var dummyAddress = "0x63FaC9201494f0bd17B9892B9fae4d52fe3BD377"

func newTestService() *Service {
return &Service{
blockRepo: &mock.BlockRepository{},
ethClient: &mock.EthClient{},

processingBlock: &relayer.Block{},
}
}
func Test_NewService(t *testing.T) {
tests := []struct {
name string
Expand Down
45 changes: 45 additions & 0 deletions packages/relayer/indexer/set_initial_processing_block_by_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package indexer

import (
"context"
"math/big"

"github.com/pkg/errors"
"github.com/taikochain/taiko-mono/packages/relayer"
)

func (svc *Service) setInitialProcessingBlockByMode(
ctx context.Context,
mode relayer.Mode,
chainID *big.Int,
) error {
switch mode {
case relayer.SyncMode:
// get most recently processed block height from the DB
latestProcessedBlock, err := svc.blockRepo.GetLatestBlockProcessedForEvent(
eventName,
chainID,
)
if err != nil {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
}

svc.processingBlock = latestProcessedBlock

return nil
case relayer.ResyncMode:
header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(0))
if err != nil {
return errors.Wrap(err, "s.blockRepo.GetLatestBlock()")
}

svc.processingBlock = &relayer.Block{
Height: header.Number.Uint64(),
Hash: header.Hash().Hex(),
}

return nil
default:
return relayer.ErrInvalidMode
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package indexer

import (
"context"
"math/big"
"testing"

"github.com/stretchr/testify/assert"
"github.com/taikochain/taiko-mono/packages/relayer"
"github.com/taikochain/taiko-mono/packages/relayer/mock"
)

func Test_SetInitialProcessingBlockByMode(t *testing.T) {
tests := []struct {
name string
mode relayer.Mode
chainID *big.Int
wantErr bool
wantHeight uint64
}{
{
"resync",
relayer.ResyncMode,
mock.MockChainID,
false,
0,
},
{
"sync",
relayer.SyncMode,
mock.MockChainID,
false,
mock.LatestBlock.Height,
},
{
"sync error getting latest block",
relayer.SyncMode,
big.NewInt(328938),
true,
0,
},
{
"invalidMode",
relayer.Mode("fake"),
mock.MockChainID,
true,
0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svc := newTestService()
err := svc.setInitialProcessingBlockByMode(
context.Background(),
tt.mode,
tt.chainID,
)

assert.Equal(t, tt.wantErr, err != nil)

assert.Equal(t, tt.wantHeight, svc.processingBlock.Height)
})
}
}
31 changes: 31 additions & 0 deletions packages/relayer/mock/block_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package mock

import (
"errors"
"math/big"

"github.com/taikochain/taiko-mono/packages/relayer"
)

var (
LatestBlock = &relayer.Block{
Height: 100,
Hash: "0x",
ChainID: MockChainID.Int64(),
}
)

type BlockRepository struct {
}

func (r *BlockRepository) Save(opts relayer.SaveBlockOpts) error {
return nil
}

func (r *BlockRepository) GetLatestBlockProcessedForEvent(eventName string, chainID *big.Int) (*relayer.Block, error) {
if chainID.Int64() != MockChainID.Int64() {
return nil, errors.New("error getting latest block processed for event")
}

return LatestBlock, nil
}
Loading

0 comments on commit 6b01cbe

Please sign in to comment.