Skip to content

Commit

Permalink
feat(relayer): Asynchronous message processing, error handling, nonce…
Browse files Browse the repository at this point in the history
… management, and indexer folder structuring (#259)

* wg/errchan handling, block batch size config param for ndexer, .default.env update, indexer folder file restructuring, nonce handling for message processor

* mysql conn params + block batch size param

* wip tests fo indexer + go mod tidy

* add gosimple linter

* refactor handle_even with cleaner handle methods. test for canProcessMessage. error handling return in handle_event after filling errChan

* subscribe return error

* Update packages/relayer/cli/cli.go

Co-authored-by: David <[email protected]>

* check negative ints for configs

* Update packages/relayer/indexer/watch_errors.go

Co-authored-by: David <[email protected]>

* Defer mutex unlock in process message

* lint

* waitgroup => errgroup

* use ResubscribeErr

* subscription backoff in seconds

* Update packages/relayer/indexer/filter_then_subscribe.go

Co-authored-by: David <[email protected]>

* lint

* lint

* bump lint funlen

Co-authored-by: David <[email protected]>
  • Loading branch information
cyberhorsey and davidtaikocha authored Nov 17, 2022
1 parent 036e022 commit ed6d551
Show file tree
Hide file tree
Showing 16 changed files with 534 additions and 224 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect
google.golang.org/grpc v1.47.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
16 changes: 12 additions & 4 deletions packages/relayer/.default.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@ MYSQL_PASSWORD=root
MYSQL_DATABASE=relayer
MYSQL_HOST=localhost:3306
RELAYER_ECDSA_KEY=
L1_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10
L2_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10
L1_RPC_URL="wss://eth-goerli.g.alchemy.com/v2/bPAA5rQ42Zoo4ts9TYnTB2t0cuc5lf7_"
L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws"
L1_BRIDGE_ADDRESS=0xB12d6112D64B213880Fa53F815aF1F29c91CaCe9
L2_BRIDGE_ADDRESS=0x4eA05A0f7713333AeB4bB73F17aEeFE146CF13E3
L1_TAIKO_ADDRESS=0x9b557777Be33A8A2fE6aF93E017A0d139B439E5D
L2_TAIKO_ADDRESS=0x0027f309f7F94A8Efb6A3DBfb30827f1062803F4
L1_RPC_URL=ws://34.132.67.34:8546
L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz
BLOCK_BATCH_SIZE=2
MYSQL_MAX_IDLE_CONNS=
MYSQL_MAX_OPEN_CONNS=
MYSQL_CONN_MAX_LIFETIME_IN_MS=
NUM_GOROUTINES=20
SUBSCRIPTION_BACKOFF_IN_SECONDS=3
CONFIRMATIONS_BEFORE_PROCESSING=15
7 changes: 4 additions & 3 deletions packages/relayer/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ linters:
- gofmt
- golint
- gosec
- gosimple
- lll
- whitespace
- wsl

linters-settings:
funlen:
lines: 105
statements: 45
lines: 116
statements: 48
gocognit:
min-complexity: 32
min-complexity: 35

issues:
exclude-rules:
Expand Down
74 changes: 71 additions & 3 deletions packages/relayer/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -36,7 +37,10 @@ var (
"CONFIRMATIONS_BEFORE_PROCESSING",
}

defaultConfirmations = 15
defaultBlockBatchSize = 2
defaultNumGoroutines = 10
defaultSubscriptionBackoff = 2 * time.Second
defaultConfirmations = 15
)

func Run(mode relayer.Mode, layer relayer.Layer) {
Expand Down Expand Up @@ -111,6 +115,25 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
return nil, nil, err
}

blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE"))
if err != nil || blockBatchSize <= 0 {
blockBatchSize = defaultBlockBatchSize
}

numGoroutines, err := strconv.Atoi(os.Getenv("NUM_GOROUTINES"))
if err != nil || numGoroutines <= 0 {
numGoroutines = defaultNumGoroutines
}

var subscriptionBackoff time.Duration

subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS"))
if err != nil || numGoroutines <= 0 {
subscriptionBackoff = defaultSubscriptionBackoff
} else {
subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second
}

confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING"))
if err != nil || confirmations <= 0 {
confirmations = defaultConfirmations
Expand All @@ -132,7 +155,10 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")),

Confirmations: uint64(confirmations),
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
})
if err != nil {
log.Fatal(err)
Expand All @@ -155,7 +181,10 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(),
DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")),
DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")),

Confirmations: uint64(confirmations),
BlockBatchSize: uint64(blockBatchSize),
NumGoroutines: numGoroutines,
SubscriptionBackoff: subscriptionBackoff,
Confirmations: uint64(confirmations),
})
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -200,6 +229,45 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB {
log.Fatal(err)
}

sqlDB, err := db.DB()
if err != nil {
log.Fatal(err)
}

var (
defaultMaxIdleConns = 50
defaultMaxOpenConns = 200
defaultConnMaxLifetime = 10 * time.Second
)

maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS"))
if err != nil || maxIdleConns <= 0 {
maxIdleConns = defaultMaxIdleConns
}

maxOpenConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_OPEN_CONNS"))
if err != nil || maxOpenConns <= 0 {
maxOpenConns = defaultMaxOpenConns
}

var maxLifetime time.Duration

connMaxLifetime, err := strconv.Atoi(os.Getenv("MYSQL_CONN_MAX_LIFETIME_IN_MS"))
if err != nil || connMaxLifetime <= 0 {
maxLifetime = defaultConnMaxLifetime
} else {
maxLifetime = time.Duration(connMaxLifetime)
}

// SetMaxOpenConns sets the maximum number of open connections to the database.
sqlDB.SetMaxOpenConns(maxOpenConns)

// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
sqlDB.SetMaxIdleConns(maxIdleConns)

// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
sqlDB.SetConnMaxLifetime(maxLifetime)

return db
}

Expand Down
Loading

0 comments on commit ed6d551

Please sign in to comment.