diff --git a/benchmark/bridge_core_test.go b/benchmark/bridge_core_test.go new file mode 100644 index 0000000..14192e6 --- /dev/null +++ b/benchmark/bridge_core_test.go @@ -0,0 +1,214 @@ +package benchmark + +import ( + "context" + "fmt" + bridge_core "github.com/axieinfinity/bridge-core" + "github.com/axieinfinity/bridge-core/models" + "github.com/axieinfinity/bridge-core/stores" + "github.com/ethereum/go-ethereum/common" + "gorm.io/gorm" + "math/big" + "math/rand" + "os" + "strconv" + "testing" + "time" +) + +var testData = make([]string, 0) + +func init() { + dataLength, _ := strconv.Atoi(os.Getenv("size")) + for i := 0; i < dataLength; i++ { + testData = append(testData, fmt.Sprintf("%v", rand.Uint32())) + } +} + +type Job struct { + store stores.MainStore + id int32 + jobType int + + retryCount int + maxTry int + nextTry int64 + backOff int + + data []byte +} + +func NewJob(id int32, store stores.MainStore, data []byte) *Job { + return &Job{ + id: id, + retryCount: 0, + maxTry: 20, + backOff: 5, + store: store, + data: data, + } +} + +func (e *Job) FromChainID() *big.Int { + return nil +} + +func (e *Job) GetID() int32 { + return e.id +} + +func (e *Job) GetType() int { + return e.jobType +} + +func (e *Job) GetRetryCount() int { + return e.retryCount +} + +func (e *Job) GetNextTry() int64 { + return e.nextTry +} + +func (e *Job) GetMaxTry() int { + return e.maxTry +} + +func (e *Job) GetData() []byte { + return e.data +} + +func (e *Job) GetValue() *big.Int { + return nil +} + +func (e *Job) GetBackOff() int { + return e.backOff +} + +func (e *Job) Process() ([]byte, error) { + return nil, nil +} + +func (e *Job) String() string { + return fmt.Sprintf("{Type:%v, Subscription:%v, RetryCount: %v}", e.GetType(), e.GetSubscriptionName(), e.GetRetryCount()) +} + +func (e *Job) Hash() common.Hash { + return common.BytesToHash([]byte(fmt.Sprintf("j-%d-%d-%d", e.id, e.retryCount, e.nextTry))) +} + +func (e *Job) IncreaseRetryCount() { + e.retryCount++ +} +func (e *Job) UpdateNextTry(nextTry int64) { + e.nextTry = nextTry +} + +func (e *Job) GetListener() bridge_core.Listener { + return nil +} + +func (e *Job) GetSubscriptionName() string { + return "" +} + +func (e *Job) GetTransaction() bridge_core.Transaction { + return nil +} + +func (e *Job) Save() error { + //job := &models.Job{ + // Listener: "", + // SubscriptionName: e.GetSubscriptionName(), + // Type: e.GetType(), + // RetryCount: e.retryCount, + // Status: stores.STATUS_PENDING, + // Data: common.Bytes2Hex(e.GetData()), + // Transaction: "", + // CreatedAt: time.Now().Unix(), + // FromChainId: "", + //} + //if err := e.store.GetJobStore().Save(job); err != nil { + // return err + //} + //e.id = int32(job.ID) + return nil +} + +func (e *Job) Update(status string) error { + job := &models.Job{ + Listener: "", + SubscriptionName: e.GetSubscriptionName(), + Type: e.GetType(), + RetryCount: e.retryCount, + Status: status, + Data: common.Bytes2Hex(e.GetData()), + Transaction: "", + CreatedAt: time.Now().Unix(), + FromChainId: "", + } + if err := e.store.GetJobStore().Save(job); err != nil { + return err + } + e.id = int32(job.ID) + return nil +} + +func (e *Job) SetID(id int32) { + e.id = id +} + +func (e *Job) CreatedAt() time.Time { + return time.Now() +} + +func addWorkers(ctx context.Context, pool *bridge_core.Pool, cfg *bridge_core.Config) { + var workers []bridge_core.Worker + for i := 0; i < cfg.NumberOfWorkers; i++ { + workers = append(workers, bridge_core.NewWorker(ctx, i, pool.PrepareJobChan, pool.FailedJobChan, pool.Queue, pool.MaxQueueSize, nil)) + } + pool.AddWorkers(workers) +} + +func newPool(ctx context.Context, db *gorm.DB, numberOfWorkers int) *bridge_core.Pool { + bridgeCnf := &bridge_core.Config{NumberOfWorkers: numberOfWorkers} + pool := bridge_core.NewPool(ctx, bridgeCnf, db, nil) + addWorkers(ctx, pool, bridgeCnf) + return pool +} + +func BenchmarkPool(b *testing.B) { + dbCfg := &stores.Database{ + Host: "localhost", + User: "postgres", + Password: "example", + DBName: "bench_mark_db", + Port: 5432, + ConnMaxLifetime: 200, + MaxIdleConns: 200, + MaxOpenConns: 200, + } + // init db based on config + db, err := stores.MustConnectDatabase(dbCfg, false) + if err != nil { + panic(err) + } + db.AutoMigrate(&models.Job{}) + ctx, cancel := context.WithCancel(context.Background()) + pool := newPool(ctx, db, 8192) + go pool.Start(nil) + + store := stores.NewMainStore(db) + now := time.Now() + for _, data := range testData { + pool.PrepareJobChan <- NewJob(0, store, []byte(data)) + } + for { + if len(pool.PrepareJobChan) == 0 && len(pool.JobChan) == 0 && len(pool.FailedJobChan) == 0 && len(pool.RetryJobChan) == 0 { + cancel() + pool.Wait() + break + } + } + println(fmt.Sprintf("total time: %d (ms)", time.Now().Sub(now).Milliseconds())) +} diff --git a/controller.go b/controller.go index 3d5f738..6ff897d 100644 --- a/controller.go +++ b/controller.go @@ -139,7 +139,7 @@ func New(cfg *Config, db *gorm.DB, helpers utils.Utils) (*Controller, error) { var workers []Worker // init workers for i := 0; i < cfg.NumberOfWorkers; i++ { - w := NewWorker(ctx, i, c.Pool.PrepareJobChan, c.Pool.FailedJobChan, c.Pool.SuccessJobChan, c.Pool.Queue, c.Pool.MaxQueueSize, c.listeners) + w := NewWorker(ctx, i, c.Pool.PrepareJobChan, c.Pool.FailedJobChan, c.Pool.Queue, c.Pool.MaxQueueSize, c.listeners) workers = append(workers, w) } c.Pool.AddWorkers(workers) @@ -223,6 +223,8 @@ func (c *Controller) processPendingJobs() { // add job to jobChan if j != nil { c.Pool.PrepareJobChan <- j + job.Status = stores.STATUS_PROCESSED + c.store.GetJobStore().Update(job) } } } @@ -298,6 +300,19 @@ func (c *Controller) startListening(listener Listener, tryCount int) { return } } + + // start stats reporter + statsTick := time.NewTicker(time.Duration(2) * time.Second) + go func() { + for { + select { + case <-statsTick.C: + stats := c.Pool.Stats() + log.Info("[Controller] pool stats", "pending", stats.PendingQueue, "queue", stats.Queue) + } + } + }() + // start listening to block's events tick := time.NewTicker(listener.Period()) for { @@ -307,6 +322,11 @@ func (c *Controller) startListening(listener Listener, tryCount int) { case <-tick.C: // stop if the pool is closed if c.Pool.IsClosed() { + // stop timer + tick.Stop() + statsTick.Stop() + // wait for pool is totally shutdown + c.Pool.Wait() return } latest, err := listener.GetLatestBlockHeight() diff --git a/job.go b/job.go index 11040d3..2d11820 100644 --- a/job.go +++ b/job.go @@ -180,7 +180,6 @@ func (e *BaseJob) Save() error { func (e *BaseJob) Update(status string) error { job := &models.Job{ - ID: int(e.id), Listener: e.listener.GetName(), SubscriptionName: e.subscriptionName, Type: e.jobType, @@ -191,9 +190,11 @@ func (e *BaseJob) Update(status string) error { CreatedAt: time.Now().Unix(), FromChainId: hexutil.EncodeBig(e.fromChainID), } - if err := e.listener.GetStore().GetJobStore().Update(job); err != nil { + if err := e.listener.GetStore().GetJobStore().Save(job); err != nil { return err } + e.id = int32(job.ID) + e.createdAt = time.Unix(job.CreatedAt, 0) return nil } diff --git a/pool.go b/pool.go index 195ad7a..546bd57 100644 --- a/pool.go +++ b/pool.go @@ -2,6 +2,7 @@ package bridge_core import ( "context" + "errors" "github.com/axieinfinity/bridge-core/adapters" "github.com/axieinfinity/bridge-core/metrics" "github.com/axieinfinity/bridge-core/stores" @@ -18,6 +19,11 @@ const ( defaultMaxQueueSize = 4096 ) +type Stats struct { + PendingQueue int + Queue int +} + type Pool struct { ctx context.Context @@ -34,7 +40,6 @@ type Pool struct { // JobChan receives new job JobChan chan JobHandler RetryJobChan chan JobHandler - SuccessJobChan chan JobHandler FailedJobChan chan JobHandler PrepareJobChan chan JobHandler @@ -66,7 +71,6 @@ func NewPool(ctx context.Context, cfg *Config, db *gorm.DB, workers []Worker) *P pool.isClosed.Store(false) pool.JobChan = make(chan JobHandler, pool.MaxQueueSize*cfg.NumberOfWorkers) pool.PrepareJobChan = make(chan JobHandler, pool.MaxQueueSize) - pool.SuccessJobChan = make(chan JobHandler, pool.MaxQueueSize) pool.FailedJobChan = make(chan JobHandler, pool.MaxQueueSize) pool.RetryJobChan = make(chan JobHandler, pool.MaxQueueSize) pool.Queue = make(chan chan JobHandler, pool.MaxQueueSize) @@ -109,8 +113,14 @@ func (p *Pool) startWorker(w Worker) { if job == nil { continue } + if p.isClosed.Load().(bool) { + // update job to db + if err := job.Update(stores.STATUS_PENDING); err != nil { + log.Error("[Pool] failed on updating failed job", "err", err, "jobType", job.GetType()) + } + continue + } log.Info("processing job", "id", job.GetID(), "nextTry", job.GetNextTry(), "retryCount", job.GetRetryCount(), "type", job.GetType()) - metrics.Pusher.IncrGauge(metrics.ProcessingJobMetric, 1) if err := w.ProcessJob(job); err != nil { // update try and next retry time if job.GetRetryCount()+1 > job.GetMaxTry() { @@ -123,7 +133,6 @@ func (p *Pool) startWorker(w Worker) { // send to retry job chan p.RetryJobChan <- job } - metrics.Pusher.IncrGauge(metrics.ProcessingJobMetric, -1) case <-w.Context().Done(): w.Close() return @@ -138,11 +147,8 @@ func (p *Pool) Start(closeFunc func()) { for _, worker := range p.Workers { go p.startWorker(worker) } - for { select { - case job := <-p.SuccessJobChan: - p.processSuccessJob(job) case job := <-p.FailedJobChan: p.processFailedJob(job) case job := <-p.RetryJobChan: @@ -154,18 +160,25 @@ func (p *Pool) Start(closeFunc func()) { // add new job to database before processing if err := p.PrepareJob(job); err != nil { log.Error("[Pool] failed on preparing job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) - metrics.Pusher.IncrCounter(metrics.PreparingFailedJobMetric, 1) continue } if p.isClosed.Load().(bool) { + if err := job.Update(stores.STATUS_PENDING); err != nil { + log.Error("[Pool] failed on saving pending job", "err", err, "jobType", job.GetType()) + } continue } - metrics.Pusher.IncrCounter(metrics.PreparingSuccessJobMetric, 1) p.JobChan <- job case job := <-p.JobChan: if job == nil { continue } + if p.isClosed.Load().(bool) { + if err := job.Update(stores.STATUS_PENDING); err != nil { + log.Error("[Pool] failed on saving processing job", "err", err, "jobType", job.GetType()) + } + continue + } log.Info("[Pool] jobChan received a job", "jobId", job.GetID(), "nextTry", job.GetNextTry(), "type", job.GetType()) workerCh := <-p.Queue workerCh <- job @@ -181,7 +194,6 @@ func (p *Pool) Start(closeFunc func()) { // close all available channels to prevent further data send to pool's channels close(p.PrepareJobChan) close(p.JobChan) - close(p.SuccessJobChan) close(p.FailedJobChan) close(p.RetryJobChan) close(p.Queue) @@ -192,8 +204,8 @@ func (p *Pool) Start(closeFunc func()) { if !more { break } - if err := p.PrepareJob(job); err != nil { - log.Error("[Pool] error while storing all jobs from prepareJobChan to database in closing step", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) + if err := job.Update(stores.STATUS_PENDING); err != nil { + log.Error("[Pool] failed on updating failed job", "err", err, "jobType", job.GetType()) } } @@ -207,15 +219,6 @@ func (p *Pool) Start(closeFunc func()) { p.updateRetryingJob(job) } - for { - log.Info("checking successJobChan") - job, more := <-p.SuccessJobChan - if !more { - break - } - p.processSuccessJob(job) - } - for { log.Info("checking failedJobChan") job, more := <-p.FailedJobChan @@ -233,6 +236,13 @@ func (p *Pool) Start(closeFunc func()) { } } +func (p *Pool) Stats() Stats { + return Stats{ + PendingQueue: len(p.PrepareJobChan), + Queue: len(p.JobChan), + } +} + func (p *Pool) PrepareRetryableJob(job JobHandler) { dur := time.Until(time.Unix(job.GetNextTry(), 0)) if dur <= 0 { @@ -250,11 +260,7 @@ func (p *Pool) PrepareRetryableJob(job JobHandler) { // PrepareJob saves new job to database func (p *Pool) PrepareJob(job JobHandler) error { if job == nil { - return nil - } - // save job to db if id = 0 - if job.GetID() == 0 { - return job.Save() + return errors.New("job is nil") } return nil } @@ -263,33 +269,18 @@ func (p *Pool) updateRetryingJob(job JobHandler) { if job == nil { return } - log.Info("process retryable job", "id", job.GetID()) if err := job.Update(stores.STATUS_PENDING); err != nil { log.Error("[Pool] failed on updating retrying job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) return } } -// processSuccessJob updates job's status to `done` to database -func (p *Pool) processSuccessJob(job JobHandler) { - if job == nil { - return - } - - log.Info("process job success", "id", job.GetID()) - if err := job.Update(stores.STATUS_DONE); err != nil { - log.Error("[Pool] failed on updating success job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) - return - } -} - // processFailedJob updates job's status to `failed` to database func (p *Pool) processFailedJob(job JobHandler) { if job == nil { return } - log.Info("process job failed", "id", job.GetID()) if err := job.Update(stores.STATUS_FAILED); err != nil { log.Error("[Pool] failed on updating failed job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) return diff --git a/stores/main.go b/stores/main.go index 303c71a..fe6abd0 100644 --- a/stores/main.go +++ b/stores/main.go @@ -16,6 +16,7 @@ const ( STATUS_FAILED = "failed" STATUS_PROCESSING = "processing" STATUS_DONE = "done" + STATUS_PROCESSED = "processed" ) type Database struct { diff --git a/worker.go b/worker.go index 74d4ed3..b02b4ba 100644 --- a/worker.go +++ b/worker.go @@ -3,9 +3,9 @@ package bridge_core import ( "context" "fmt" - "github.com/axieinfinity/bridge-core/metrics" "github.com/axieinfinity/bridge-core/utils" "github.com/ethereum/go-ethereum/log" + "github.com/go-stack/stack" "sync/atomic" ) @@ -17,7 +17,6 @@ type Worker interface { Channel() chan JobHandler PoolChannel() chan<- JobHandler FailedChannel() chan<- JobHandler - SuccessChannel() chan<- JobHandler WorkersQueue() chan chan JobHandler } @@ -44,14 +43,13 @@ type BridgeWorker struct { isClose int32 } -func NewWorker(ctx context.Context, id int, mainChan, failedChan, successChan chan<- JobHandler, queue chan chan JobHandler, size int, listeners map[string]Listener) *BridgeWorker { +func NewWorker(ctx context.Context, id int, mainChan, failedChan chan<- JobHandler, queue chan chan JobHandler, size int, listeners map[string]Listener) *BridgeWorker { return &BridgeWorker{ ctx: ctx, id: id, workerChan: make(chan JobHandler, size), mainChan: mainChan, failedChan: failedChan, - successChan: successChan, queue: queue, listeners: listeners, utilWrapper: utils.NewUtils(), @@ -65,15 +63,12 @@ func (w *BridgeWorker) String() string { func (w *BridgeWorker) ProcessJob(job JobHandler) error { val, err := job.Process() if err != nil { - log.Error("[BridgeWorker] failed while processing job", "id", job.GetID(), "err", err) - metrics.Pusher.IncrCounter(metrics.ProcessedFailedJobMetric, 1) + log.Error("[BridgeWorker] failed while processing job", "id", job.GetID(), "err", err, "stack", stack.Trace().String()) return err } if job.GetType() == ListenHandler && job.GetSubscriptionName() != "" { job.GetListener().SendCallbackJobs(w.listeners, job.GetSubscriptionName(), job.GetTransaction(), val) } - metrics.Pusher.IncrCounter(metrics.ProcessedSuccessJobMetric, 1) - w.successChan <- job return nil } @@ -101,10 +96,6 @@ func (w *BridgeWorker) FailedChannel() chan<- JobHandler { return w.failedChan } -func (w *BridgeWorker) SuccessChannel() chan<- JobHandler { - return w.successChan -} - func (w *BridgeWorker) Close() { atomic.StoreInt32(&w.isClose, 1) close(w.workerChan)