Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(relayer): header sync check before processing messages #441

Merged
merged 6 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/relayer/.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ MYSQL_CONN_MAX_LIFETIME_IN_MS=
NUM_GOROUTINES=20
SUBSCRIPTION_BACKOFF_IN_SECONDS=3
CONFIRMATIONS_BEFORE_PROCESSING=15
CORS_ORIGINS=*
CORS_ORIGINS=*
HEADER_SYNC_INTERVAL_IN_SECONDS=60
6 changes: 3 additions & 3 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ linters:

linters-settings:
funlen:
lines: 123
statements: 50
lines: 130
statements: 52
gocognit:
min-complexity: 37
min-complexity: 40

issues:
exclude-rules:
Expand Down
38 changes: 23 additions & 15 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ var (
"PROMETHEUS_HTTP_PORT",
}

defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
defaultHeaderSyncIntervalSeconds int = 60
)

func Run(
Expand Down Expand Up @@ -150,12 +151,17 @@ func makeIndexers(
var subscriptionBackoff time.Duration

subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS"))
if err != nil || numGoroutines <= 0 {
if err != nil || subscriptionBackoffInSeconds <= 0 {
subscriptionBackoff = defaultSubscriptionBackoff
} else {
subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second
}

headerSyncIntervalInSeconds, err := strconv.Atoi(os.Getenv("HEADER_SYNC_INTERVAL_IN_SECONDS"))
if err != nil || headerSyncIntervalInSeconds <= 0 {
headerSyncIntervalInSeconds = defaultHeaderSyncIntervalSeconds
}

confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING"))
if err != nil || confirmations <= 0 {
confirmations = defaultConfirmations
Expand Down Expand Up @@ -198,11 +204,12 @@ func makeIndexers(
DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")),
SrcTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
HeaderSyncIntervalInSeconds: int64(headerSyncIntervalInSeconds),
})
if err != nil {
log.Fatal(err)
Expand All @@ -225,11 +232,12 @@ func makeIndexers(
DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
ProfitableOnly: profitableOnly,
HeaderSyncIntervalInSeconds: int64(headerSyncIntervalInSeconds),
})
if err != nil {
log.Fatal(err)
Expand Down
56 changes: 29 additions & 27 deletions packages/relayer/indexer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ type Service struct {
}

type NewServiceOpts struct {
EventRepo relayer.EventRepository
BlockRepo relayer.BlockRepository
EthClient *ethclient.Client
DestEthClient *ethclient.Client
RPCClient *rpc.Client
DestRPCClient *rpc.Client
ECDSAKey string
BridgeAddress common.Address
DestBridgeAddress common.Address
SrcTaikoAddress common.Address
DestTaikoAddress common.Address
BlockBatchSize uint64
NumGoroutines int
SubscriptionBackoff time.Duration
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
EventRepo relayer.EventRepository
BlockRepo relayer.BlockRepository
EthClient *ethclient.Client
DestEthClient *ethclient.Client
RPCClient *rpc.Client
DestRPCClient *rpc.Client
ECDSAKey string
BridgeAddress common.Address
DestBridgeAddress common.Address
SrcTaikoAddress common.Address
DestTaikoAddress common.Address
BlockBatchSize uint64
NumGoroutines int
SubscriptionBackoff time.Duration
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
HeaderSyncIntervalInSeconds int64
}

func NewService(opts NewServiceOpts) (*Service, error) {
Expand Down Expand Up @@ -144,17 +145,18 @@ func NewService(opts NewServiceOpts) (*Service, error) {
}

processor, err := message.NewProcessor(message.NewProcessorOpts{
Prover: prover,
ECDSAKey: privateKey,
RPCClient: opts.RPCClient,
DestETHClient: opts.DestEthClient,
DestBridge: destBridge,
EventRepo: opts.EventRepo,
DestHeaderSyncer: destHeaderSyncer,
RelayerAddress: relayerAddr,
Confirmations: opts.Confirmations,
SrcETHClient: opts.EthClient,
ProfitableOnly: opts.ProfitableOnly,
Prover: prover,
ECDSAKey: privateKey,
RPCClient: opts.RPCClient,
DestETHClient: opts.DestEthClient,
DestBridge: destBridge,
EventRepo: opts.EventRepo,
DestHeaderSyncer: destHeaderSyncer,
RelayerAddress: relayerAddr,
Confirmations: opts.Confirmations,
SrcETHClient: opts.EthClient,
ProfitableOnly: opts.ProfitableOnly,
HeaderSyncIntervalSeconds: opts.HeaderSyncIntervalInSeconds,
})
if err != nil {
return nil, errors.Wrap(err, "message.NewProcessor")
Expand Down
13 changes: 3 additions & 10 deletions packages/relayer/indexer/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts"
"golang.org/x/sync/errgroup"
)

// subscribe subscribes to latest events
Expand All @@ -30,23 +29,17 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error {

defer sub.Unsubscribe()

group, ctx := errgroup.WithContext(ctx)
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved

group.SetLimit(svc.numGoroutines)

for {
select {
case err := <-sub.Err():
return errors.Wrap(err, "sub.Err()")
case event := <-sink:
group.Go(func() error {
go func() {
err := svc.handleEvent(ctx, chainID, event)
if err != nil {
log.Errorf("svc.handleEvent: %v", err)
log.Errorf("svc.subscribe, svc.handleEvent: %v", err)
}

return nil
})
}()
}
}
}
11 changes: 5 additions & 6 deletions packages/relayer/message/process_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,17 @@ func (p *Processor) ProcessMessage(
return errors.Wrap(err, "p.waitForConfirmations")
}

if err := p.waitHeaderSynced(ctx, event); err != nil {
return errors.Wrap(err, "p.waitHeaderSynced")
}

// get latest synced header since not every header is synced from L1 => L2,
// and later blocks still have the storage trie proof from previous blocks.
latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{})
if err != nil {
return errors.Wrap(err, "taiko.GetSyncedHeader")
}

// if header hasnt been synced, we are unable to process this message
if common.BytesToHash(latestSyncedHeader[:]).Hex() == relayer.ZeroHash.Hex() {
log.Infof("header not synced, bailing")
return nil
}

hashed := crypto.Keccak256(
event.Raw.Address.Bytes(), // L1 bridge address
event.Signal[:],
Expand All @@ -69,6 +67,7 @@ func (p *Processor) ProcessMessage(

// message will fail when we try to process it
if !received {
log.Warnf("signal %v not received on dest chain", common.Hash(event.Signal).Hex())
return errors.New("message not received")
}

Expand Down
30 changes: 17 additions & 13 deletions packages/relayer/message/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ethClient interface {
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
BlockNumber(ctx context.Context) (uint64, error)
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
}
type Processor struct {
eventRepo relayer.EventRepository
Expand All @@ -35,21 +36,23 @@ type Processor struct {
relayerAddr common.Address
confirmations uint64

profitableOnly relayer.ProfitableOnly
profitableOnly relayer.ProfitableOnly
headerSyncIntervalSeconds int64
}

type NewProcessorOpts struct {
Prover *proof.Prover
ECDSAKey *ecdsa.PrivateKey
RPCClient relayer.Caller
SrcETHClient ethClient
DestETHClient ethClient
DestBridge relayer.Bridge
EventRepo relayer.EventRepository
DestHeaderSyncer relayer.HeaderSyncer
RelayerAddress common.Address
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
Prover *proof.Prover
ECDSAKey *ecdsa.PrivateKey
RPCClient relayer.Caller
SrcETHClient ethClient
DestETHClient ethClient
DestBridge relayer.Bridge
EventRepo relayer.EventRepository
DestHeaderSyncer relayer.HeaderSyncer
RelayerAddress common.Address
Confirmations uint64
ProfitableOnly relayer.ProfitableOnly
HeaderSyncIntervalSeconds int64
}

func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
Expand Down Expand Up @@ -107,6 +110,7 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) {
relayerAddr: opts.RelayerAddress,
confirmations: opts.Confirmations,

profitableOnly: opts.ProfitableOnly,
profitableOnly: opts.ProfitableOnly,
headerSyncIntervalSeconds: opts.HeaderSyncIntervalSeconds,
}, nil
}
21 changes: 11 additions & 10 deletions packages/relayer/message/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ func newTestProcessor(profitableOnly relayer.ProfitableOnly) *Processor {
)

return &Processor{
eventRepo: &mock.EventRepository{},
destBridge: &mock.Bridge{},
srcEthClient: &mock.EthClient{},
destEthClient: &mock.EthClient{},
mu: &sync.Mutex{},
ecdsaKey: privateKey,
destHeaderSyncer: &mock.HeaderSyncer{},
prover: prover,
rpc: &mock.Caller{},
profitableOnly: profitableOnly,
eventRepo: &mock.EventRepository{},
destBridge: &mock.Bridge{},
srcEthClient: &mock.EthClient{},
destEthClient: &mock.EthClient{},
mu: &sync.Mutex{},
ecdsaKey: privateKey,
destHeaderSyncer: &mock.HeaderSyncer{},
prover: prover,
rpc: &mock.Caller{},
profitableOnly: profitableOnly,
headerSyncIntervalSeconds: 1,
}
}
func Test_NewProcessor(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion packages/relayer/message/wait_for_confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error {
// TODO: make timeout a config var
ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute)
ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute)

defer cancelFunc()

Expand Down
55 changes: 55 additions & 0 deletions packages/relayer/message/wait_header_synced.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package message

import (
"context"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts"
)

func (p *Processor) waitHeaderSynced(ctx context.Context, event *contracts.BridgeMessageSent) error {
ticker := time.NewTicker(time.Duration(p.headerSyncIntervalSeconds) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
// get latest synced header since not every header is synced from L1 => L2,
// and later blocks still have the storage trie proof from previous blocks.
latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{})
if err != nil {
return errors.Wrap(err, "p.destHeaderSyncer.GetLatestSyncedHeader")
}

block, err := p.srcEthClient.BlockByHash(ctx, latestSyncedHeader)
cyberhorsey marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrap(err, "p.destHeaderSyncer.GetLatestSyncedHeader")
}

// header is caught up and processible
if block.NumberU64() >= event.Raw.BlockNumber {
log.Infof(
"signal: %v is processable. occured in block %v, latestSynced is block %v",
common.Hash(event.Signal).Hex(),
event.Raw.BlockNumber,
block.NumberU64(),
)

return nil
}

log.Infof(
"signal: %v waiting to be processable. occured in block %v, latestSynced is block %v",
common.Hash(event.Signal).Hex(),
event.Raw.BlockNumber,
block.NumberU64(),
)
}
}
}
21 changes: 21 additions & 0 deletions packages/relayer/message/wait_header_synced_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package message

import (
"context"
"testing"

"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
"github.com/taikoxyz/taiko-mono/packages/relayer/contracts"
)

func Test_waitHeaderSynced(t *testing.T) {
p := newTestProcessor(true)

err := p.waitHeaderSynced(context.TODO(), &contracts.BridgeMessageSent{
Raw: types.Log{
BlockNumber: 1,
},
})
assert.Nil(t, err)
}
Loading