diff --git a/controller.go b/controller.go index c73c2d8..79bd8f1 100644 --- a/controller.go +++ b/controller.go @@ -266,6 +266,9 @@ func (c *Controller) Start() error { case job := <-c.FailedJobChan: c.processFailedJob(job) case job := <-c.PrepareJobChan: + if job == nil { + continue + } // add new job to database before processing if err := c.prepareJob(job); err != nil { log.Error("[Controller] failed on preparing job", "err", err, "jobType", job.GetType(), "tx", job.GetTransaction().GetHash().Hex()) @@ -281,10 +284,9 @@ func (c *Controller) Start() error { } // get 1 workerCh from queue and push job to this channel hash := job.Hash() - if _, ok := c.processedJobs.Load(hash); ok { + if _, ok := c.processedJobs.LoadOrStore(hash, struct{}{}); ok { continue } - c.processedJobs.Store(hash, struct{}{}) log.Info("[Controller] jobChan received a job", "jobId", job.GetID(), "nextTry", job.GetNextTry(), "type", job.GetType()) workerCh := <-c.Queue workerCh <- job diff --git a/types.go b/types.go index cb3e295..c2e3abc 100644 --- a/types.go +++ b/types.go @@ -141,21 +141,11 @@ type Config struct { Listeners map[string]*LsConfig `json:"listeners"` NumberOfWorkers int `json:"numberOfWorkers"` DB *stores.Database `json:"database"` - Cleaner Cleaner `json:"cleaner"` // this field is used for testing purpose Testing bool } -type Cleaner map[string]*CleanerConfig - -type CleanerConfig struct { - Cron string `json:"cron"` - RemoveAfter uint64 `json:"removeAfter"` - SkipIfLessThan uint64 `json:"SkipIfLessThan"` - Description string `json:"description"` -} - type LsConfig struct { ChainId string `json:"chainId"` Name string `json:"-"`