diff --git a/pool.go b/pool.go index d3fa48f..07ba96e 100644 --- a/pool.go +++ b/pool.go @@ -9,7 +9,6 @@ import ( "github.com/ethereum/go-ethereum/log" "gorm.io/gorm" "runtime/debug" - "sync" "sync/atomic" "time" ) @@ -34,13 +33,12 @@ type Pool struct { // JobChan receives new job JobChan chan JobHandler + RetryJobChan chan JobHandler SuccessJobChan chan JobHandler FailedJobChan chan JobHandler PrepareJobChan chan JobHandler - jobId int32 - processedJobs sync.Map - + jobId int32 MaxQueueSize int store stores.MainStore @@ -70,6 +68,7 @@ func NewPool(ctx context.Context, cfg *Config, db *gorm.DB, workers []Worker) *P pool.PrepareJobChan = make(chan JobHandler, pool.MaxQueueSize) pool.SuccessJobChan = make(chan JobHandler, pool.MaxQueueSize) pool.FailedJobChan = make(chan JobHandler, pool.MaxQueueSize) + pool.RetryJobChan = make(chan JobHandler, pool.MaxQueueSize) pool.Queue = make(chan chan JobHandler, pool.MaxQueueSize) if workers != nil { @@ -111,14 +110,20 @@ func (p *Pool) startWorker(w Worker) { continue } log.Info("processing job", "id", job.GetID(), "nextTry", job.GetNextTry(), "retryCount", job.GetRetryCount(), "type", job.GetType()) - if job.GetNextTry() == 0 || job.GetNextTry() <= time.Now().Unix() { - metrics.Pusher.IncrGauge(metrics.ProcessingJobMetric, 1) - w.ProcessJob(job) - metrics.Pusher.IncrGauge(metrics.ProcessingJobMetric, -1) - continue + metrics.Pusher.IncrGauge(metrics.ProcessingJobMetric, 1) + if err := w.ProcessJob(job); err != nil { + // update try and next retry time + if job.GetRetryCount()+1 > job.GetMaxTry() { + log.Info("[Pool][processJob] job reaches its maxTry", "jobTransaction", job.GetTransaction().GetHash().Hex()) + p.FailedJobChan <- job + continue + } + job.IncreaseRetryCount() + job.UpdateNextTry(time.Now().Unix() + int64(job.GetRetryCount()*job.GetBackOff())) + // send to retry job chan + p.RetryJobChan <- job } - // push the job back to mainChan - w.PoolChannel() <- job + metrics.Pusher.IncrGauge(metrics.ProcessingJobMetric, -1) case <-w.Context().Done(): w.Close() return @@ -140,6 +145,8 @@ func (p *Pool) Start(closeFunc func()) { p.processSuccessJob(job) case job := <-p.FailedJobChan: p.processFailedJob(job) + case job := <-p.RetryJobChan: + go p.PrepareRetryableJob(job) case job := <-p.PrepareJobChan: if job == nil { continue @@ -156,11 +163,6 @@ func (p *Pool) Start(closeFunc func()) { if job == nil { continue } - // get 1 workerCh from queue and push job to this channel - hash := job.Hash() - if _, ok := p.processedJobs.LoadOrStore(hash, struct{}{}); ok { - continue - } log.Info("[Pool] jobChan received a job", "jobId", job.GetID(), "nextTry", job.GetNextTry(), "type", job.GetType()) workerCh := <-p.Queue workerCh <- job @@ -185,6 +187,19 @@ func (p *Pool) Start(closeFunc func()) { } } + for { + log.Info("checking retrying jobs") + if len(p.RetryJobChan) == 0 { + break + } + job, more := <-p.RetryJobChan + if !more { + break + } + // update job + p.updateRetryingJob(job) + } + // update all success jobs for { log.Info("checking successJobChan") @@ -216,8 +231,9 @@ func (p *Pool) Start(closeFunc func()) { close(p.JobChan) close(p.SuccessJobChan) close(p.FailedJobChan) + close(p.RetryJobChan) close(p.Queue) - + log.Info("finish closing pool") // send signal to stop the program p.stop <- struct{}{} @@ -226,32 +242,43 @@ func (p *Pool) Start(closeFunc func()) { } } +func (p *Pool) PrepareRetryableJob(job JobHandler) { + dur := time.Until(time.Unix(job.GetNextTry(), 0)) + if dur <= 0 { + return + } + timer := time.NewTimer(dur) + select { + case <-timer.C: + p.PrepareJobChan <- job + case <-p.ctx.Done(): + p.updateRetryingJob(job) + } +} + // PrepareJob saves new job to database func (p *Pool) PrepareJob(job JobHandler) error { if job == nil { return nil } - // deduplication: get hash from data and type and check if it exists in `processedJobs` or not. - hash := p.utils.RlpHash(struct { - Data []byte - Type int - }{ - Data: job.GetData(), - Type: job.GetType(), - }) - log.Info("[PrepareJobChan] preparing new job", "job", job.String(), "hash", hash.Hex()) - if _, ok := p.processedJobs.Load(hash); ok { - return nil - } // save job to db if id = 0 if job.GetID() == 0 { return job.Save() } - // cache above hash to `processedJobs` - p.processedJobs.Store(hash, struct{}{}) return nil } +func (p *Pool) updateRetryingJob(job JobHandler) { + if job == nil { + return + } + log.Info("process retryable job", "id", job.GetID()) + if err := job.Update(stores.STATUS_PENDING); err != nil { + log.Error("[Pool] failed on updating retrying job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) + return + } +} + // processSuccessJob updates job's status to `done` to database func (p *Pool) processSuccessJob(job JobHandler) { if job == nil { @@ -261,8 +288,6 @@ func (p *Pool) processSuccessJob(job JobHandler) { log.Info("process job success", "id", job.GetID()) if err := job.Update(stores.STATUS_DONE); err != nil { log.Error("[Pool] failed on updating success job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) - // send back job to successJobChan - p.SuccessJobChan <- job return } } @@ -276,8 +301,6 @@ func (p *Pool) processFailedJob(job JobHandler) { log.Info("process job failed", "id", job.GetID()) if err := job.Update(stores.STATUS_FAILED); err != nil { log.Error("[Pool] failed on updating failed job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) - // send back job to failedJobChan - p.FailedJobChan <- job return } } diff --git a/worker.go b/worker.go index a8f6245..74d4ed3 100644 --- a/worker.go +++ b/worker.go @@ -7,13 +7,12 @@ import ( "github.com/axieinfinity/bridge-core/utils" "github.com/ethereum/go-ethereum/log" "sync/atomic" - "time" ) type Worker interface { Context() context.Context Close() - ProcessJob(job JobHandler) + ProcessJob(job JobHandler) error IsClose() bool Channel() chan JobHandler PoolChannel() chan<- JobHandler @@ -63,36 +62,19 @@ func (w *BridgeWorker) String() string { return fmt.Sprintf("{ id: %d, currentSize: %d }", w.id, len(w.workerChan)) } -func (w *BridgeWorker) ProcessJob(job JobHandler) { - var ( - err error - val []byte - ) - - val, err = job.Process() +func (w *BridgeWorker) ProcessJob(job JobHandler) error { + val, err := job.Process() if err != nil { log.Error("[BridgeWorker] failed while processing job", "id", job.GetID(), "err", err) - goto ERROR + metrics.Pusher.IncrCounter(metrics.ProcessedFailedJobMetric, 1) + return err } if job.GetType() == ListenHandler && job.GetSubscriptionName() != "" { job.GetListener().SendCallbackJobs(w.listeners, job.GetSubscriptionName(), job.GetTransaction(), val) } metrics.Pusher.IncrCounter(metrics.ProcessedSuccessJobMetric, 1) w.successChan <- job - return -ERROR: - metrics.Pusher.IncrCounter(metrics.ProcessedFailedJobMetric, 1) - - if job.GetRetryCount()+1 > job.GetMaxTry() { - log.Info("[BridgeWorker][processJob] job reaches its maxTry", "jobTransaction", job.GetTransaction().GetHash().Hex()) - w.failedChan <- job - return - } - job.IncreaseRetryCount() - job.UpdateNextTry(time.Now().Unix() + int64(job.GetRetryCount()*job.GetBackOff())) - // push the job back to mainChan - w.mainChan <- job - log.Info("[BridgeWorker][processJob] job failed, added back to jobChan", "id", job.GetID(), "retryCount", job.GetRetryCount(), "nextTry", job.GetNextTry()) + return nil } func (w *BridgeWorker) Context() context.Context {