diff --git a/go.mod b/go.mod index ff09077ea3..f388a52a26 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect google.golang.org/grpc v1.47.0 // indirect diff --git a/go.sum b/go.sum index cedf6ecb41..88f1db973f 100644 --- a/go.sum +++ b/go.sum @@ -970,6 +970,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/packages/relayer/.default.env b/packages/relayer/.default.env index 202ac92040..948eb9fb34 100644 --- a/packages/relayer/.default.env +++ b/packages/relayer/.default.env @@ -4,8 +4,16 @@ MYSQL_PASSWORD=root MYSQL_DATABASE=relayer MYSQL_HOST=localhost:3306 RELAYER_ECDSA_KEY= -L1_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10 -L2_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10 -L1_RPC_URL="wss://eth-goerli.g.alchemy.com/v2/bPAA5rQ42Zoo4ts9TYnTB2t0cuc5lf7_" -L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws" +L1_BRIDGE_ADDRESS=0xB12d6112D64B213880Fa53F815aF1F29c91CaCe9 +L2_BRIDGE_ADDRESS=0x4eA05A0f7713333AeB4bB73F17aEeFE146CF13E3 +L1_TAIKO_ADDRESS=0x9b557777Be33A8A2fE6aF93E017A0d139B439E5D +L2_TAIKO_ADDRESS=0x0027f309f7F94A8Efb6A3DBfb30827f1062803F4 +L1_RPC_URL=ws://34.132.67.34:8546 +L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz +BLOCK_BATCH_SIZE=2 +MYSQL_MAX_IDLE_CONNS= +MYSQL_MAX_OPEN_CONNS= +MYSQL_CONN_MAX_LIFETIME_IN_MS= +NUM_GOROUTINES=20 +SUBSCRIPTION_BACKOFF_IN_SECONDS=3 CONFIRMATIONS_BEFORE_PROCESSING=15 \ No newline at end of file diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index 2a77b22d86..b2008b7e90 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -19,16 +19,17 @@ linters: - gofmt - golint - gosec + - gosimple - lll - whitespace - wsl linters-settings: funlen: - lines: 105 - statements: 45 + lines: 116 + statements: 48 gocognit: - min-complexity: 32 + min-complexity: 35 issues: exclude-rules: diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index 8c87b4cb42..f2e8e9aac8 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strconv" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" @@ -36,7 +37,10 @@ var ( "CONFIRMATIONS_BEFORE_PROCESSING", } - defaultConfirmations = 15 + defaultBlockBatchSize = 2 + defaultNumGoroutines = 10 + defaultSubscriptionBackoff = 2 * time.Second + defaultConfirmations = 15 ) func Run(mode relayer.Mode, layer relayer.Layer) { @@ -111,6 +115,25 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), return nil, nil, err } + blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE")) + if err != nil || blockBatchSize <= 0 { + blockBatchSize = defaultBlockBatchSize + } + + numGoroutines, err := strconv.Atoi(os.Getenv("NUM_GOROUTINES")) + if err != nil || numGoroutines <= 0 { + numGoroutines = defaultNumGoroutines + } + + var subscriptionBackoff time.Duration + + subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS")) + if err != nil || numGoroutines <= 0 { + subscriptionBackoff = defaultSubscriptionBackoff + } else { + subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second + } + confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING")) if err != nil || confirmations <= 0 { confirmations = defaultConfirmations @@ -132,7 +155,10 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")), - Confirmations: uint64(confirmations), + BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, + SubscriptionBackoff: subscriptionBackoff, + Confirmations: uint64(confirmations), }) if err != nil { log.Fatal(err) @@ -155,7 +181,10 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), - Confirmations: uint64(confirmations), + BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, + SubscriptionBackoff: subscriptionBackoff, + Confirmations: uint64(confirmations), }) if err != nil { log.Fatal(err) @@ -200,6 +229,45 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB { log.Fatal(err) } + sqlDB, err := db.DB() + if err != nil { + log.Fatal(err) + } + + var ( + defaultMaxIdleConns = 50 + defaultMaxOpenConns = 200 + defaultConnMaxLifetime = 10 * time.Second + ) + + maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS")) + if err != nil || maxIdleConns <= 0 { + maxIdleConns = defaultMaxIdleConns + } + + maxOpenConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_OPEN_CONNS")) + if err != nil || maxOpenConns <= 0 { + maxOpenConns = defaultMaxOpenConns + } + + var maxLifetime time.Duration + + connMaxLifetime, err := strconv.Atoi(os.Getenv("MYSQL_CONN_MAX_LIFETIME_IN_MS")) + if err != nil || connMaxLifetime <= 0 { + maxLifetime = defaultConnMaxLifetime + } else { + maxLifetime = time.Duration(connMaxLifetime) + } + + // SetMaxOpenConns sets the maximum number of open connections to the database. + sqlDB.SetMaxOpenConns(maxOpenConns) + + // SetMaxIdleConns sets the maximum number of connections in the idle connection pool. + sqlDB.SetMaxIdleConns(maxIdleConns) + + // SetConnMaxLifetime sets the maximum amount of time a connection may be reused. + sqlDB.SetConnMaxLifetime(maxLifetime) + return db } diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index f3487e7028..8ffdabd6fc 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -2,15 +2,12 @@ package indexer import ( "context" - "encoding/json" - "math/big" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/taikochain/taiko-mono/packages/relayer" - "github.com/taikochain/taiko-mono/packages/relayer/contracts" + "golang.org/x/sync/errgroup" ) var ( @@ -30,12 +27,6 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) return errors.Wrap(err, "svc.setInitialProcessingBlockByMode") } - log.Infof("processing from block height: %v", svc.processingBlock.Height) - - if err != nil { - return errors.Wrap(err, "bridge.FilterMessageSent") - } - header, err := svc.ethClient.HeaderByNumber(ctx, nil) if err != nil { return errors.Wrap(err, "svc.ethClient.HeaderByNumber") @@ -46,19 +37,14 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) return svc.subscribe(ctx, chainID) } - const batchSize = 1000 - log.Infof("getting events between %v and %v in batches of %v", svc.processingBlock.Height, header.Number.Int64(), - batchSize, + svc.blockBatchSize, ) - // todo: parallelize/concurrently catch up. don't think we need to do this in order. - // use WaitGroup. - // we get a timeout/EOF if we don't batch. - for i := svc.processingBlock.Height; i < header.Number.Uint64(); i += batchSize { - var end uint64 = svc.processingBlock.Height + batchSize + for i := svc.processingBlock.Height; i < header.Number.Uint64(); i += svc.blockBatchSize { + end := svc.processingBlock.Height + svc.blockBatchSize // if the end of the batch is greater than the latest block number, set end // to the latest block number if end > header.Number.Uint64() { @@ -84,14 +70,29 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) continue } - log.Info("found events") + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(svc.numGoroutines) + + // TODO: do we want to limit the number of possible goroutines in the waitgroup? + // right now it is dependent on how many events are found in the + // block range. the main concern would be exceeding DB connection pooling limits. for { - if err := svc.handleEvent(ctx, chainID, events.Event); err != nil { - return errors.Wrap(err, "svc.handleEvent") - } + group.Go(func() error { + err := svc.handleEvent(ctx, chainID, events.Event) + if err != nil { + // log error but always return nil to keep other goroutines active + log.Error(err.Error()) + } + + return nil + }) if !events.Next() { + if err := group.Wait(); err != nil { + return errors.Wrap(err, "group.Wait") + } + if err := svc.handleNoEventsRemaining(ctx, chainID, events); err != nil { return errors.Wrap(err, "svc.handleNoEventsRemaining") } @@ -114,163 +115,3 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) return svc.subscribe(ctx, chainID) } - -// subscribe subscribes to latest events -func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { - sink := make(chan *contracts.BridgeMessageSent) - - sub, err := svc.bridge.WatchMessageSent(&bind.WatchOpts{}, sink, nil) - if err != nil { - return errors.Wrap(err, "svc.bridge.WatchMessageSent") - } - - defer sub.Unsubscribe() - - for { - select { - case err := <-sub.Err(): - return err - case event := <-sink: - if err := svc.handleEvent(ctx, chainID, event); err != nil { - return errors.Wrap(err, "svc.handleEvent") - } - } - } -} - -// handleEvent handles an individual MessageSent event -func (svc *Service) handleEvent(ctx context.Context, chainID *big.Int, event *contracts.BridgeMessageSent) error { - log.Infof("event found. signal:%v for block: %v", common.Hash(event.Signal).Hex(), event.Raw.BlockNumber) - - marshaled, err := json.Marshal(event) - if err != nil { - return errors.Wrap(err, "json.Marshal(event)") - } - - raw := event.Raw - - // handle chain re-org by checking Removed property, no need to - // return error, just continue and do not process. - if raw.Removed { - return nil - } - - // save event to database for later processing outside - // the indexer - log.Info("saving event to database") - - eventStatus := relayer.EventStatusNew - // if gasLimit is 0, relayer can not process this. - if event.Message.GasLimit == nil || event.Message.GasLimit.Cmp(common.Big0) == 0 { - eventStatus = relayer.EventStatusNewOnlyOwner - } - - e, err := svc.eventRepo.Save(relayer.SaveEventOpts{ - Name: eventName, - Data: string(marshaled), - ChainID: chainID, - Status: eventStatus, - }) - if err != nil { - return errors.Wrap(err, "svc.eventRepo.Save") - } - - // we can not process, exit early - if eventStatus == relayer.EventStatusNewOnlyOwner && event.Message.Owner != svc.relayerAddr { - log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop") - return nil - } - - messageStatus, err := svc.destBridge.GetMessageStatus(nil, event.Signal) - if err != nil { - return errors.Wrap(err, "svc.destBridge.GetMessageStatus") - } - - if messageStatus == uint8(relayer.EventStatusNew) { - log.Info("message not processed yet, attempting processing") - // process the message - if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { - // TODO: handle error here, update in eventRepo, continue on in processing - return errors.Wrap(err, "svc.processMessage") - } - } - - // if the block number of the event is higher than the block we are processing, - // we can now consider that previous block processed. save it to the DB - // and bump the block number. - if raw.BlockNumber > svc.processingBlock.Height { - log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber) - - if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ - Height: svc.processingBlock.Height, - Hash: common.HexToHash(svc.processingBlock.Hash), - ChainID: chainID, - EventName: eventName, - }); err != nil { - return errors.Wrap(err, "svc.blockRepo.Save") - } - - svc.processingBlock = &relayer.Block{ - Height: raw.BlockNumber, - Hash: raw.BlockHash.Hex(), - } - } - - return nil -} - -// handleNoEventsRemaining is used when the batch had events, but is now finished, and we need to -// update the latest block processed -func (svc *Service) handleNoEventsRemaining( - ctx context.Context, - chainID *big.Int, - events *contracts.BridgeMessageSentIterator, -) error { - log.Info("no events remaining to be processed") - - if events.Error() != nil { - return errors.Wrap(events.Error(), "events.Error") - } - - log.Infof("saving new latest processed block to DB: %v", events.Event.Raw.BlockNumber) - - if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ - Height: events.Event.Raw.BlockNumber, - Hash: events.Event.Raw.BlockHash, - ChainID: chainID, - EventName: eventName, - }); err != nil { - return errors.Wrap(err, "svc.blockRepo.Save") - } - - return nil -} - -// handleNoEventsInBatch is used when an entire batch call has no events in the entire response, -// and we need to update the latest block processed -func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, blockNumber int64) error { - log.Infof("no events in batch") - - header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(blockNumber)) - if err != nil { - return errors.Wrap(err, "svc.ethClient.HeaderByNumber") - } - - log.Infof("setting last processed block to height: %v, hash: %v", blockNumber, header.Hash().Hex()) - - if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ - Height: uint64(blockNumber), - Hash: header.Hash(), - ChainID: chainID, - EventName: eventName, - }); err != nil { - return errors.Wrap(err, "svc.blockRepo.Save") - } - - svc.processingBlock = &relayer.Block{ - Height: uint64(blockNumber), - Hash: header.Hash().Hex(), - } - - return nil -} diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go new file mode 100644 index 0000000000..bd98caabc8 --- /dev/null +++ b/packages/relayer/indexer/handle_event.go @@ -0,0 +1,125 @@ +package indexer + +import ( + "context" + "encoding/json" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer" + "github.com/taikochain/taiko-mono/packages/relayer/contracts" +) + +// handleEvent handles an individual MessageSent event +func (svc *Service) handleEvent( + ctx context.Context, + chainID *big.Int, + event *contracts.BridgeMessageSent, +) error { + raw := event.Raw + + // handle chain re-org by checking Removed property, no need to + // return error, just continue and do not process. + if raw.Removed { + return nil + } + + eventStatus, err := svc.eventStatusFromSignal(ctx, event.Message.GasLimit, event.Signal) + if err != nil { + return errors.Wrap(err, "svc.eventStatusFromSignal") + } + + marshaled, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "json.Marshal(event)") + } + + e, err := svc.eventRepo.Save(relayer.SaveEventOpts{ + Name: eventName, + Data: string(marshaled), + ChainID: chainID, + Status: eventStatus, + }) + if err != nil { + return errors.Wrap(err, "svc.eventRepo.Save") + } + + if !canProcessMessage(ctx, eventStatus, event.Message.Owner, svc.relayerAddr) { + return nil + } + + // process the message + if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { + return errors.Wrap(err, "svc.processMessage") + } + + // if the block number of the event is higher than the block we are processing, + // we can now consider that previous block processed. save it to the DB + // and bump the block number. + if raw.BlockNumber > svc.processingBlock.Height { + log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber) + + if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ + Height: svc.processingBlock.Height, + Hash: common.HexToHash(svc.processingBlock.Hash), + ChainID: chainID, + EventName: eventName, + }); err != nil { + return errors.Wrap(err, "svc.blockRepo.Save") + } + + svc.processingBlock = &relayer.Block{ + Height: raw.BlockNumber, + Hash: raw.BlockHash.Hex(), + } + } + + return nil +} + +func canProcessMessage( + ctx context.Context, + eventStatus relayer.EventStatus, + messageOwner common.Address, + relayerAddress common.Address, +) bool { + // we can not process, exit early + if eventStatus == relayer.EventStatusNewOnlyOwner { + if messageOwner != relayerAddress { + log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop") + return false + } + + return true + } + + if eventStatus == relayer.EventStatusNew { + return true + } + + return false +} + +func (svc *Service) eventStatusFromSignal( + ctx context.Context, + gasLimit *big.Int, + signal [32]byte, +) (relayer.EventStatus, error) { + var eventStatus relayer.EventStatus + + // if gasLimit is 0, relayer can not process this. + if gasLimit == nil || gasLimit.Cmp(common.Big0) == 0 { + eventStatus = relayer.EventStatusNewOnlyOwner + } else { + messageStatus, err := svc.destBridge.GetMessageStatus(nil, signal) + if err != nil { + return 0, errors.Wrap(err, "svc.destBridge.GetMessageStatus") + } + + eventStatus = relayer.EventStatus(messageStatus) + } + + return eventStatus, nil +} diff --git a/packages/relayer/indexer/handle_event_test.go b/packages/relayer/indexer/handle_event_test.go new file mode 100644 index 0000000000..dba884f22d --- /dev/null +++ b/packages/relayer/indexer/handle_event_test.go @@ -0,0 +1,73 @@ +package indexer + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/taikochain/taiko-mono/packages/relayer" + "gopkg.in/go-playground/assert.v1" +) + +var ( + relayerAddr = common.HexToAddress("0x71C7656EC7ab88b098defB751B7401B5f6d8976F") +) + +func Test_canProcessMessage(t *testing.T) { + tests := []struct { + name string + eventStatus relayer.EventStatus + messageOwner common.Address + relayerAddress common.Address + want bool + }{ + { + "canProcess, eventStatusNew", + relayer.EventStatusNew, + relayerAddr, + relayerAddr, + true, + }, + { + "cantProcess, eventStatusDone", + relayer.EventStatusDone, + relayerAddr, + relayerAddr, + false, + }, + { + "cantProcess, eventStatusRetriable", + relayer.EventStatusRetriable, + relayerAddr, + relayerAddr, + false, + }, + { + "cantProcess, eventStatusNewOnlyOwner and relayer is not owner", + relayer.EventStatusNewOnlyOwner, + common.HexToAddress("0x"), + relayerAddr, + false, + }, + { + "canProcess, eventStatusOnlyOwner and relayer address is owner", + relayer.EventStatusNewOnlyOwner, + relayerAddr, + relayerAddr, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + canProcess := canProcessMessage( + context.Background(), + tt.eventStatus, + tt.messageOwner, + tt.relayerAddress, + ) + + assert.Equal(t, tt.want, canProcess) + }) + } +} diff --git a/packages/relayer/indexer/handle_no_events_in_batch.go b/packages/relayer/indexer/handle_no_events_in_batch.go new file mode 100644 index 0000000000..d16a9df696 --- /dev/null +++ b/packages/relayer/indexer/handle_no_events_in_batch.go @@ -0,0 +1,39 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer" +) + +// handleNoEventsInBatch is used when an entire batch call has no events in the entire response, +// and we need to update the latest block processed +func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, blockNumber int64) error { + log.Infof("no events in batch") + + header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(blockNumber)) + if err != nil { + return errors.Wrap(err, "svc.ethClient.HeaderByNumber") + } + + log.Infof("setting last processed block to height: %v, hash: %v", blockNumber, header.Hash().Hex()) + + if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ + Height: uint64(blockNumber), + Hash: header.Hash(), + ChainID: chainID, + EventName: eventName, + }); err != nil { + return errors.Wrap(err, "svc.blockRepo.Save") + } + + svc.processingBlock = &relayer.Block{ + Height: uint64(blockNumber), + Hash: header.Hash().Hex(), + } + + return nil +} diff --git a/packages/relayer/indexer/handle_no_events_remaining.go b/packages/relayer/indexer/handle_no_events_remaining.go new file mode 100644 index 0000000000..e125b29421 --- /dev/null +++ b/packages/relayer/indexer/handle_no_events_remaining.go @@ -0,0 +1,38 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer" + "github.com/taikochain/taiko-mono/packages/relayer/contracts" +) + +// handleNoEventsRemaining is used when the batch had events, but is now finished, and we need to +// update the latest block processed +func (svc *Service) handleNoEventsRemaining( + ctx context.Context, + chainID *big.Int, + events *contracts.BridgeMessageSentIterator, +) error { + log.Info("no events remaining to be processed") + + if events.Error() != nil { + return errors.Wrap(events.Error(), "events.Error") + } + + log.Infof("saving new latest processed block to DB: %v", events.Event.Raw.BlockNumber) + + if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ + Height: events.Event.Raw.BlockNumber, + Hash: events.Event.Raw.BlockHash, + ChainID: chainID, + EventName: eventName, + }); err != nil { + return errors.Wrap(err, "svc.blockRepo.Save") + } + + return nil +} diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index da60d17b31..dd5a8b847f 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "math/big" + "time" "github.com/cyberhorsey/errors" "github.com/ethereum/go-ethereum/common" @@ -40,20 +41,29 @@ type Service struct { processor *message.Processor relayerAddr common.Address + + errChan chan error + + blockBatchSize uint64 + numGoroutines int + subscriptionBackoff time.Duration } type NewServiceOpts struct { - EventRepo relayer.EventRepository - BlockRepo relayer.BlockRepository - EthClient *ethclient.Client - DestEthClient *ethclient.Client - RPCClient *rpc.Client - DestRPCClient *rpc.Client - ECDSAKey string - BridgeAddress common.Address - DestBridgeAddress common.Address - DestTaikoAddress common.Address - Confirmations uint64 + EventRepo relayer.EventRepository + BlockRepo relayer.BlockRepository + EthClient *ethclient.Client + DestEthClient *ethclient.Client + RPCClient *rpc.Client + DestRPCClient *rpc.Client + ECDSAKey string + BridgeAddress common.Address + DestBridgeAddress common.Address + DestTaikoAddress common.Address + BlockBatchSize uint64 + NumGoroutines int + SubscriptionBackoff time.Duration + Confirmations uint64 } func NewService(opts NewServiceOpts) (*Service, error) { @@ -131,6 +141,7 @@ func NewService(opts NewServiceOpts) (*Service, error) { DestBridge: destBridge, EventRepo: opts.EventRepo, DestHeaderSyncer: destHeaderSyncer, + RelayerAddress: relayerAddr, Confirmations: opts.Confirmations, SrcETHClient: opts.EthClient, }) @@ -150,5 +161,11 @@ func NewService(opts NewServiceOpts) (*Service, error) { processor: processor, relayerAddr: relayerAddr, + + errChan: make(chan error), + + blockBatchSize: opts.BlockBatchSize, + numGoroutines: opts.NumGoroutines, + subscriptionBackoff: opts.SubscriptionBackoff, }, nil } diff --git a/packages/relayer/indexer/service_test.go b/packages/relayer/indexer/service_test.go index 8f6b22bcef..d6cfb99d84 100644 --- a/packages/relayer/indexer/service_test.go +++ b/packages/relayer/indexer/service_test.go @@ -23,6 +23,7 @@ func newTestService() *Service { processingBlock: &relayer.Block{}, } } + func Test_NewService(t *testing.T) { tests := []struct { name string diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go new file mode 100644 index 0000000000..5dd65a3e4e --- /dev/null +++ b/packages/relayer/indexer/subscribe.go @@ -0,0 +1,50 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/event" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer/contracts" + "golang.org/x/sync/errgroup" +) + +// subscribe subscribes to latest events +func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { + sink := make(chan *contracts.BridgeMessageSent) + + sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + log.Errorf("svc.bridge.WatchMessageSent: %v", err) + } + + return svc.bridge.WatchMessageSent(&bind.WatchOpts{ + Context: ctx, + }, sink, nil) + }) + + defer sub.Unsubscribe() + + group, ctx := errgroup.WithContext(ctx) + + group.SetLimit(svc.numGoroutines) + + for { + select { + case err := <-sub.Err(): + return errors.Wrap(err, "sub.Err()") + case event := <-sink: + group.Go(func() error { + err := svc.handleEvent(ctx, chainID, event) + if err != nil { + log.Errorf("svc.handleEvent: %v", err) + } + + return nil + }) + } + } +} diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 160244b9f9..2ab2943bce 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -56,19 +57,6 @@ func (p *Processor) ProcessMessage( log.Infof("processing message for signal: %v, key: %v", common.Hash(event.Signal).Hex(), key) - auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, event.Message.DestChainId) - if err != nil { - return errors.Wrap(err, "bind.NewKeyedTransactorWithChainID") - } - - auth.Context = ctx - - // uncomment to skip `eth_estimateGas` - auth.GasLimit = 2000000 - auth.GasPrice = new(big.Int).SetUint64(500000000) - - log.Infof("getting proof") - encodedSignalProof, err := p.prover.EncodedSignalProof(ctx, p.rpc, event.Raw.Address, key, latestSyncedHeader) if err != nil { return errors.Wrap(err, "p.prover.GetEncodedSignalProof") @@ -84,18 +72,15 @@ func (p *Processor) ProcessMessage( return errors.Wrap(err, "p.destBridge.IsMessageReceived") } - log.Infof("isMessageReceived: %v", received) - // message will fail when we try to process is // TODO: update status in db if !received { return errors.New("message not received") } - // process the message on the destination bridge. - tx, err := p.destBridge.ProcessMessage(auth, event.Message, encodedSignalProof) + tx, err := p.sendProcessMessageCall(ctx, event, encodedSignalProof) if err != nil { - return errors.Wrap(err, "p.destBridge.ProcessMessage") + return errors.Wrap(err, "p.sendProcessMessageCall") } log.Infof("waiting for tx hash %v", hex.EncodeToString(tx.Hash().Bytes())) @@ -122,6 +107,59 @@ func (p *Processor) ProcessMessage( return nil } +func (p *Processor) sendProcessMessageCall( + ctx context.Context, + event *contracts.BridgeMessageSent, + proof []byte, +) (*types.Transaction, error) { + auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, event.Message.DestChainId) + if err != nil { + return nil, errors.Wrap(err, "bind.NewKeyedTransactorWithChainID") + } + + auth.Context = ctx + + // uncomment to skip `eth_estimateGas` + auth.GasLimit = 2000000 + auth.GasPrice = new(big.Int).SetUint64(500000000) + + p.mu.Lock() + defer p.mu.Unlock() + + err = p.getLatestNonce(ctx, auth) + if err != nil { + return nil, errors.New("p.getLatestNonce") + } + // process the message on the destination bridge. + tx, err := p.destBridge.ProcessMessage(auth, event.Message, proof) + if err != nil { + return nil, errors.Wrap(err, "p.destBridge.ProcessMessage") + } + + p.setLatestNonce(tx.Nonce()) + + return tx, nil +} + +func (p *Processor) setLatestNonce(nonce uint64) { + p.destNonce = nonce +} + +func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts) error { + pendingNonce, err := p.destEthClient.PendingNonceAt(ctx, p.relayerAddr) + if err != nil { + return err + } + + if pendingNonce > p.destNonce { + p.setLatestNonce(pendingNonce) + } + + auth.Nonce = big.NewInt(int64(p.destNonce)) + + return nil +} + func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { // TODO: make timeout a config var ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute) diff --git a/packages/relayer/message/processor.go b/packages/relayer/message/processor.go index 34ef0d0f19..db2781e9f8 100644 --- a/packages/relayer/message/processor.go +++ b/packages/relayer/message/processor.go @@ -2,7 +2,9 @@ package message import ( "crypto/ecdsa" + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/ethclient" @@ -23,6 +25,10 @@ type Processor struct { prover *proof.Prover + mu *sync.Mutex + + destNonce uint64 + relayerAddr common.Address confirmations uint64 } @@ -35,6 +41,7 @@ type NewProcessorOpts struct { DestBridge *contracts.Bridge EventRepo relayer.EventRepository DestHeaderSyncer *contracts.IHeaderSync + RelayerAddress common.Address Confirmations uint64 } @@ -87,6 +94,10 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) { destBridge: opts.DestBridge, destHeaderSyncer: opts.DestHeaderSyncer, + mu: &sync.Mutex{}, + + destNonce: 0, + relayerAddr: opts.RelayerAddress, confirmations: opts.Confirmations, }, nil } diff --git a/packages/relayer/proof/encoded_signal_proof.go b/packages/relayer/proof/encoded_signal_proof.go index 8737e6eecf..766568a04f 100644 --- a/packages/relayer/proof/encoded_signal_proof.go +++ b/packages/relayer/proof/encoded_signal_proof.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rlp" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) // EncodedSignalProof rlp and abi encodes the SignalProof struct expected by LibBridgeSignal @@ -43,8 +42,6 @@ func (p *Prover) EncodedSignalProof( return nil, errors.Wrap(err, "enoding.EncodeSignalProof") } - log.Infof("signalProof: %s", hexutil.Encode(encodedSignalProof)) - return encodedSignalProof, nil }