Skip to content

Commit

Permalink
feat: cross network retry (#337)
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 authored Nov 4, 2024
1 parent 03ec27c commit eb43206
Show file tree
Hide file tree
Showing 40 changed files with 2,532 additions and 670 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ genmocks:
mockgen --package mock_tss -destination=./tss/mock/frost.go -source=./tss/frost/keygen/keygen.go
mockgen -source=./tss/coordinator.go -destination=./tss/mock/coordinator.go
mockgen -source=./comm/communication.go -destination=./comm/mock/communication.go
mockgen -source=./chains/evm/listener/eventHandlers/event-handler.go -destination=./chains/evm/listener/eventHandlers/mock/listener.go
mockgen -source=./chains/evm/listener/eventHandlers/deposit.go -destination=./chains/evm/listener/eventHandlers/mock/listener.go
mockgen -source=./chains/evm/listener/eventHandlers/retry.go -destination=./chains/evm/listener/eventHandlers/mock/retry.go
mockgen -source=./chains/evm/calls/events/listener.go -destination=./chains/evm/calls/events/mock/listener.go
mockgen -source=./chains/substrate/listener/event-handlers.go -destination=./chains/substrate/listener/mock/handlers.go
mockgen -source=./chains/btc/listener/event-handlers.go -destination=./chains/btc/listener/mock/handlers.go
mockgen -source=./chains/btc/listener/listener.go -destination=./chains/btc/listener/mock/listener.go
mockgen -source=./topology/topology.go -destination=./topology/mock/topology.go
mockgen -source=./chains/btc/executor/message-handler.go -destination=./chains/btc/executor/mock/message-handler.go
mockgen -source=./chains/substrate/executor/message-handler.go -destination=./chains/substrate/executor/mock/message-handler.go
mockgen -source=./chains/evm/executor/message-handler.go -destination=./chains/evm/executor/mock/message-handler.go


e2e-test:
Expand Down
42 changes: 26 additions & 16 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import (
"github.com/ChainSafe/sygma-relayer/chains/evm/calls/events"
"github.com/ChainSafe/sygma-relayer/chains/evm/executor"
"github.com/ChainSafe/sygma-relayer/chains/evm/listener/depositHandlers"
hubEventHandlers "github.com/ChainSafe/sygma-relayer/chains/evm/listener/eventHandlers"
evmEventHandlers "github.com/ChainSafe/sygma-relayer/chains/evm/listener/eventHandlers"
"github.com/ChainSafe/sygma-relayer/chains/substrate"
"github.com/ChainSafe/sygma-relayer/relayer/retry"
"github.com/ChainSafe/sygma-relayer/relayer/transfer"
propStore "github.com/ChainSafe/sygma-relayer/store"
"github.com/sygmaprotocol/sygma-core/chains/evm/transactor/gas"
Expand Down Expand Up @@ -192,11 +193,7 @@ func Run() error {
bridgeContract := bridge.NewBridgeContract(client, bridgeAddress, t)

depositHandler := depositHandlers.NewETHDepositHandler(bridgeContract)
mh := message.NewMessageHandler()
for _, handler := range config.Handlers {

mh.RegisterMessageHandler(transfer.TransferMessageType, &executor.TransferMessageHandler{})

switch handler.Type {
case "erc20", "native":
{
Expand All @@ -220,12 +217,21 @@ func Run() error {
tssListener := events.NewListener(client)
eventHandlers := make([]listener.EventHandler, 0)
l := log.With().Str("chain", fmt.Sprintf("%v", config.GeneralChainConfig.Name)).Uint8("domainID", *config.GeneralChainConfig.Id)
eventHandlers = append(eventHandlers, hubEventHandlers.NewDepositEventHandler(depositListener, depositHandler, bridgeAddress, *config.GeneralChainConfig.Id, msgChan))
eventHandlers = append(eventHandlers, hubEventHandlers.NewKeygenEventHandler(l, tssListener, coordinator, host, communication, keyshareStore, bridgeAddress, networkTopology.Threshold))
eventHandlers = append(eventHandlers, hubEventHandlers.NewFrostKeygenEventHandler(l, tssListener, coordinator, host, communication, frostKeyshareStore, frostAddress, networkTopology.Threshold))
eventHandlers = append(eventHandlers, hubEventHandlers.NewRefreshEventHandler(l, topologyProvider, topologyStore, tssListener, coordinator, host, communication, connectionGate, keyshareStore, frostKeyshareStore, bridgeAddress))
eventHandlers = append(eventHandlers, hubEventHandlers.NewRetryEventHandler(l, tssListener, depositHandler, propStore, bridgeAddress, *config.GeneralChainConfig.Id, config.BlockConfirmations, msgChan))

depositEventHandler := evmEventHandlers.NewDepositEventHandler(depositListener, depositHandler, bridgeAddress, *config.GeneralChainConfig.Id, msgChan)
eventHandlers = append(eventHandlers, depositEventHandler)
eventHandlers = append(eventHandlers, evmEventHandlers.NewKeygenEventHandler(l, tssListener, coordinator, host, communication, keyshareStore, bridgeAddress, networkTopology.Threshold))
eventHandlers = append(eventHandlers, evmEventHandlers.NewFrostKeygenEventHandler(l, tssListener, coordinator, host, communication, frostKeyshareStore, frostAddress, networkTopology.Threshold))
eventHandlers = append(eventHandlers, evmEventHandlers.NewRefreshEventHandler(l, topologyProvider, topologyStore, tssListener, coordinator, host, communication, connectionGate, keyshareStore, frostKeyshareStore, bridgeAddress))
eventHandlers = append(eventHandlers, evmEventHandlers.NewRetryV1EventHandler(l, tssListener, depositHandler, propStore, bridgeAddress, *config.GeneralChainConfig.Id, config.BlockConfirmations, msgChan))
if config.Retry != "" {
eventHandlers = append(eventHandlers, evmEventHandlers.NewRetryV2EventHandler(l, tssListener, common.HexToAddress(config.Retry), *config.GeneralChainConfig.Id, msgChan))
}
evmListener := listener.NewEVMListener(client, eventHandlers, blockstore, sygmaMetrics, *config.GeneralChainConfig.Id, config.BlockRetryInterval, config.BlockConfirmations, config.BlockInterval)

mh := message.NewMessageHandler()
mh.RegisterMessageHandler(retry.RetryMessageType, executor.NewRetryMessageHandler(depositEventHandler, client, propStore, config.BlockConfirmations, msgChan))
mh.RegisterMessageHandler(transfer.TransferMessageType, &executor.TransferMessageHandler{})
executor := executor.NewExecutor(host, communication, coordinator, bridgeContract, keyshareStore, exitLock, config.GasLimit.Uint64(), config.TransferGas)

startBlock, err := blockstore.GetStartBlock(*config.GeneralChainConfig.Id, config.StartBlock, config.GeneralChainConfig.LatestBlock, config.GeneralChainConfig.FreshStart)
Expand Down Expand Up @@ -272,13 +278,14 @@ func Run() error {
depositHandler := substrateListener.NewSubstrateDepositHandler()
depositHandler.RegisterDepositHandler(transfer.FungibleTransfer, substrateListener.FungibleTransferHandler)
eventHandlers := make([]coreSubstrateListener.EventHandler, 0)
eventHandlers = append(eventHandlers, substrateListener.NewFungibleTransferEventHandler(l, *config.GeneralChainConfig.Id, depositHandler, msgChan, conn))
depositEventHandler := substrateListener.NewFungibleTransferEventHandler(l, *config.GeneralChainConfig.Id, depositHandler, msgChan, conn)
eventHandlers = append(eventHandlers, substrateListener.NewRetryEventHandler(l, conn, depositHandler, *config.GeneralChainConfig.Id, msgChan))

eventHandlers = append(eventHandlers, depositEventHandler)
substrateListener := coreSubstrateListener.NewSubstrateListener(conn, eventHandlers, blockstore, sygmaMetrics, *config.GeneralChainConfig.Id, config.BlockRetryInterval, config.BlockInterval)

mh := message.NewMessageHandler()
mh.RegisterMessageHandler(transfer.TransferMessageType, &substrateExecutor.SubstrateMessageHandler{})
mh.RegisterMessageHandler(retry.RetryMessageType, substrateExecutor.NewRetryMessageHandler(depositEventHandler, conn, propStore, msgChan))

sExecutor := substrateExecutor.NewExecutor(host, communication, coordinator, bridgePallet, keyshareStore, conn, exitLock)

Expand Down Expand Up @@ -319,17 +326,20 @@ func Run() error {
}

l := log.With().Str("chain", fmt.Sprintf("%v", config.GeneralChainConfig.Name)).Uint8("domainID", *config.GeneralChainConfig.Id)
depositHandler := &btcListener.BtcDepositHandler{}
eventHandlers := make([]btcListener.EventHandler, 0)
resources := make(map[[32]byte]btcConfig.Resource)
for _, resource := range config.Resources {
resources[resource.ResourceID] = resource
eventHandlers = append(eventHandlers, btcListener.NewFungibleTransferEventHandler(l, *config.GeneralChainConfig.Id, depositHandler, msgChan, conn, resource, config.FeeAddress))
}
depositHandler := &btcListener.BtcDepositHandler{}
depositEventHandler := btcListener.NewFungibleTransferEventHandler(l, *config.GeneralChainConfig.Id, depositHandler, msgChan, conn, resources, config.FeeAddress)
eventHandlers := make([]btcListener.EventHandler, 0)
eventHandlers = append(eventHandlers, depositEventHandler)
listener := btcListener.NewBtcListener(conn, eventHandlers, config, blockstore)

mempool := mempool.NewMempoolAPI(config.MempoolUrl)
mh := &btcExecutor.BtcMessageHandler{}
mh := message.NewMessageHandler()
mh.RegisterMessageHandler(transfer.TransferMessageType, &btcExecutor.FungibleMessageHandler{})
mh.RegisterMessageHandler(retry.RetryMessageType, btcExecutor.NewRetryMessageHandler(depositEventHandler, conn, config.BlockConfirmations, propStore, msgChan))
uploader := uploader.NewIPFSUploader(configuration.RelayerConfig.UploaderConfig)

executor := btcExecutor.NewExecutor(
Expand Down
4 changes: 2 additions & 2 deletions chains/btc/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type BtcChain struct {

listener EventListener
executor *executor.Executor
mh *executor.BtcMessageHandler
mh *message.MessageHandler

startBlock *big.Int
logger zerolog.Logger
Expand All @@ -34,7 +34,7 @@ type BtcChain struct {
func NewBtcChain(
listener EventListener,
executor *executor.Executor,
mh *executor.BtcMessageHandler,
mh *message.MessageHandler,
id uint8,
) *BtcChain {
return &BtcChain{
Expand Down
5 changes: 0 additions & 5 deletions chains/btc/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ type MempoolAPI interface {
Utxos(address string) ([]mempool.Utxo, error)
}

type PropStorer interface {
StorePropStatus(source, destination uint8, depositNonce uint64, status store.PropStatus) error
PropStatus(source, destination uint8, depositNonce uint64) (store.PropStatus, error)
}

type Executor struct {
coordinator *tss.Coordinator
host host.Host
Expand Down
81 changes: 79 additions & 2 deletions chains/btc/executor/message-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ package executor

import (
"errors"
"fmt"
"math/big"

"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/sygmaprotocol/sygma-core/relayer/message"
"github.com/sygmaprotocol/sygma-core/relayer/proposal"

"github.com/ChainSafe/sygma-relayer/relayer/retry"
"github.com/ChainSafe/sygma-relayer/relayer/transfer"
"github.com/ChainSafe/sygma-relayer/store"
)

type BtcTransferProposalData struct {
Expand All @@ -26,9 +31,9 @@ type BtcTransferProposal struct {
Data BtcTransferProposalData
}

type BtcMessageHandler struct{}
type FungibleMessageHandler struct{}

func (h *BtcMessageHandler) HandleMessage(msg *message.Message) (*proposal.Proposal, error) {
func (h *FungibleMessageHandler) HandleMessage(msg *message.Message) (*proposal.Proposal, error) {
transferMessage := &transfer.TransferMessage{
Source: msg.Source,
Destination: msg.Destination,
Expand Down Expand Up @@ -70,3 +75,75 @@ func ERC20MessageHandler(msg *transfer.TransferMessage) (*proposal.Proposal, err
ResourceId: msg.Data.ResourceId,
}, msg.ID, transfer.TransferProposalType), nil
}

type BlockFetcher interface {
GetBlockVerboseTx(*chainhash.Hash) (*btcjson.GetBlockVerboseTxResult, error)
GetBestBlockHash() (*chainhash.Hash, error)
}

type PropStorer interface {
StorePropStatus(source, destination uint8, depositNonce uint64, status store.PropStatus) error
PropStatus(source, destination uint8, depositNonce uint64) (store.PropStatus, error)
}

type DepositProcessor interface {
ProcessDeposits(blockNumber *big.Int) (map[uint8][]*message.Message, error)
}

type RetryMessageHandler struct {
depositProcessor DepositProcessor
blockFetcher BlockFetcher
blockConfirmations *big.Int
propStorer PropStorer
msgChan chan []*message.Message
}

func NewRetryMessageHandler(
depositProcessor DepositProcessor,
blockFetcher BlockFetcher,
blockConfirmations *big.Int,
propStorer PropStorer,
msgChan chan []*message.Message) *RetryMessageHandler {
return &RetryMessageHandler{
depositProcessor: depositProcessor,
blockFetcher: blockFetcher,
blockConfirmations: blockConfirmations,
propStorer: propStorer,
msgChan: msgChan,
}
}

func (h *RetryMessageHandler) HandleMessage(msg *message.Message) (*proposal.Proposal, error) {
retryData := msg.Data.(retry.RetryMessageData)
hash, err := h.blockFetcher.GetBestBlockHash()
if err != nil {
return nil, err
}
block, err := h.blockFetcher.GetBlockVerboseTx(hash)
if err != nil {
return nil, err
}
latestBlock := big.NewInt(block.Height)
if latestBlock.Cmp(new(big.Int).Add(retryData.BlockHeight, h.blockConfirmations)) != 1 {
return nil, fmt.Errorf(
"latest block %s higher than receipt block number + block confirmations %s",
latestBlock,
new(big.Int).Add(retryData.BlockHeight, h.blockConfirmations),
)
}

domainDeposits, err := h.depositProcessor.ProcessDeposits(retryData.BlockHeight)
if err != nil {
return nil, err
}
filteredDeposits, err := retry.FilterDeposits(h.propStorer, domainDeposits, retryData.ResourceID, retryData.DestinationDomainID)
if err != nil {
return nil, err
}
if len(filteredDeposits) == 0 {
return nil, nil
}

h.msgChan <- filteredDeposits
return nil, nil
}
Loading

0 comments on commit eb43206

Please sign in to comment.