Skip to content

Commit

Permalink
optimize pool performance (#28)
Browse files Browse the repository at this point in the history
* feat: optimize pool performance by stop storing pending and done jobs to db

* chore: insert job to DB in Update function

* fix: statsTick only run once
  • Loading branch information
DNK90 committed Dec 22, 2022
1 parent 375d6a0 commit e2b767c
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 56 deletions.
214 changes: 214 additions & 0 deletions benchmark/bridge_core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package benchmark

import (
"context"
"fmt"
bridge_core "github.com/axieinfinity/bridge-core"
"github.com/axieinfinity/bridge-core/models"
"github.com/axieinfinity/bridge-core/stores"
"github.com/ethereum/go-ethereum/common"
"gorm.io/gorm"
"math/big"
"math/rand"
"os"
"strconv"
"testing"
"time"
)

var testData = make([]string, 0)

func init() {
dataLength, _ := strconv.Atoi(os.Getenv("size"))
for i := 0; i < dataLength; i++ {
testData = append(testData, fmt.Sprintf("%v", rand.Uint32()))
}
}

type Job struct {
store stores.MainStore
id int32
jobType int

retryCount int
maxTry int
nextTry int64
backOff int

data []byte
}

func NewJob(id int32, store stores.MainStore, data []byte) *Job {
return &Job{
id: id,
retryCount: 0,
maxTry: 20,
backOff: 5,
store: store,
data: data,
}
}

func (e *Job) FromChainID() *big.Int {
return nil
}

func (e *Job) GetID() int32 {
return e.id
}

func (e *Job) GetType() int {
return e.jobType
}

func (e *Job) GetRetryCount() int {
return e.retryCount
}

func (e *Job) GetNextTry() int64 {
return e.nextTry
}

func (e *Job) GetMaxTry() int {
return e.maxTry
}

func (e *Job) GetData() []byte {
return e.data
}

func (e *Job) GetValue() *big.Int {
return nil
}

func (e *Job) GetBackOff() int {
return e.backOff
}

func (e *Job) Process() ([]byte, error) {
return nil, nil
}

func (e *Job) String() string {
return fmt.Sprintf("{Type:%v, Subscription:%v, RetryCount: %v}", e.GetType(), e.GetSubscriptionName(), e.GetRetryCount())
}

func (e *Job) Hash() common.Hash {
return common.BytesToHash([]byte(fmt.Sprintf("j-%d-%d-%d", e.id, e.retryCount, e.nextTry)))
}

func (e *Job) IncreaseRetryCount() {
e.retryCount++
}
func (e *Job) UpdateNextTry(nextTry int64) {
e.nextTry = nextTry
}

func (e *Job) GetListener() bridge_core.Listener {
return nil
}

func (e *Job) GetSubscriptionName() string {
return ""
}

func (e *Job) GetTransaction() bridge_core.Transaction {
return nil
}

func (e *Job) Save() error {
//job := &models.Job{
// Listener: "",
// SubscriptionName: e.GetSubscriptionName(),
// Type: e.GetType(),
// RetryCount: e.retryCount,
// Status: stores.STATUS_PENDING,
// Data: common.Bytes2Hex(e.GetData()),
// Transaction: "",
// CreatedAt: time.Now().Unix(),
// FromChainId: "",
//}
//if err := e.store.GetJobStore().Save(job); err != nil {
// return err
//}
//e.id = int32(job.ID)
return nil
}

func (e *Job) Update(status string) error {
job := &models.Job{
Listener: "",
SubscriptionName: e.GetSubscriptionName(),
Type: e.GetType(),
RetryCount: e.retryCount,
Status: status,
Data: common.Bytes2Hex(e.GetData()),
Transaction: "",
CreatedAt: time.Now().Unix(),
FromChainId: "",
}
if err := e.store.GetJobStore().Save(job); err != nil {
return err
}
e.id = int32(job.ID)
return nil
}

func (e *Job) SetID(id int32) {
e.id = id
}

func (e *Job) CreatedAt() time.Time {
return time.Now()
}

func addWorkers(ctx context.Context, pool *bridge_core.Pool, cfg *bridge_core.Config) {
var workers []bridge_core.Worker
for i := 0; i < cfg.NumberOfWorkers; i++ {
workers = append(workers, bridge_core.NewWorker(ctx, i, pool.PrepareJobChan, pool.FailedJobChan, pool.Queue, pool.MaxQueueSize, nil))
}
pool.AddWorkers(workers)
}

func newPool(ctx context.Context, db *gorm.DB, numberOfWorkers int) *bridge_core.Pool {
bridgeCnf := &bridge_core.Config{NumberOfWorkers: numberOfWorkers}
pool := bridge_core.NewPool(ctx, bridgeCnf, db, nil)
addWorkers(ctx, pool, bridgeCnf)
return pool
}

func BenchmarkPool(b *testing.B) {
dbCfg := &stores.Database{
Host: "localhost",
User: "postgres",
Password: "example",
DBName: "bench_mark_db",
Port: 5432,
ConnMaxLifetime: 200,
MaxIdleConns: 200,
MaxOpenConns: 200,
}
// init db based on config
db, err := stores.MustConnectDatabase(dbCfg, false)
if err != nil {
panic(err)
}
db.AutoMigrate(&models.Job{})
ctx, cancel := context.WithCancel(context.Background())
pool := newPool(ctx, db, 8192)
go pool.Start(nil)

store := stores.NewMainStore(db)
now := time.Now()
for _, data := range testData {
pool.PrepareJobChan <- NewJob(0, store, []byte(data))
}
for {
if len(pool.PrepareJobChan) == 0 && len(pool.JobChan) == 0 && len(pool.FailedJobChan) == 0 && len(pool.RetryJobChan) == 0 {
cancel()
pool.Wait()
break
}
}
println(fmt.Sprintf("total time: %d (ms)", time.Now().Sub(now).Milliseconds()))
}
22 changes: 21 additions & 1 deletion controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func New(cfg *Config, db *gorm.DB, helpers utils.Utils) (*Controller, error) {
var workers []Worker
// init workers
for i := 0; i < cfg.NumberOfWorkers; i++ {
w := NewWorker(ctx, i, c.Pool.PrepareJobChan, c.Pool.FailedJobChan, c.Pool.SuccessJobChan, c.Pool.Queue, c.Pool.MaxQueueSize, c.listeners)
w := NewWorker(ctx, i, c.Pool.PrepareJobChan, c.Pool.FailedJobChan, c.Pool.Queue, c.Pool.MaxQueueSize, c.listeners)
workers = append(workers, w)
}
c.Pool.AddWorkers(workers)
Expand Down Expand Up @@ -223,6 +223,8 @@ func (c *Controller) processPendingJobs() {
// add job to jobChan
if j != nil {
c.Pool.PrepareJobChan <- j
job.Status = stores.STATUS_PROCESSED
c.store.GetJobStore().Update(job)
}
}
}
Expand Down Expand Up @@ -298,6 +300,19 @@ func (c *Controller) startListening(listener Listener, tryCount int) {
return
}
}

// start stats reporter
statsTick := time.NewTicker(time.Duration(2) * time.Second)
go func() {
for {
select {
case <-statsTick.C:
stats := c.Pool.Stats()
log.Info("[Controller] pool stats", "pending", stats.PendingQueue, "queue", stats.Queue)
}
}
}()

// start listening to block's events
tick := time.NewTicker(listener.Period())
for {
Expand All @@ -307,6 +322,11 @@ func (c *Controller) startListening(listener Listener, tryCount int) {
case <-tick.C:
// stop if the pool is closed
if c.Pool.IsClosed() {
// stop timer
tick.Stop()
statsTick.Stop()
// wait for pool is totally shutdown
c.Pool.Wait()
return
}
latest, err := listener.GetLatestBlockHeight()
Expand Down
5 changes: 3 additions & 2 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ func (e *BaseJob) Save() error {

func (e *BaseJob) Update(status string) error {
job := &models.Job{
ID: int(e.id),
Listener: e.listener.GetName(),
SubscriptionName: e.subscriptionName,
Type: e.jobType,
Expand All @@ -191,9 +190,11 @@ func (e *BaseJob) Update(status string) error {
CreatedAt: time.Now().Unix(),
FromChainId: hexutil.EncodeBig(e.fromChainID),
}
if err := e.listener.GetStore().GetJobStore().Update(job); err != nil {
if err := e.listener.GetStore().GetJobStore().Save(job); err != nil {
return err
}
e.id = int32(job.ID)
e.createdAt = time.Unix(job.CreatedAt, 0)
return nil
}

Expand Down
Loading

0 comments on commit e2b767c

Please sign in to comment.