From f3c553ad4fa0b0710f624e84ac1783ae4541f652 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Fri, 15 Sep 2023 16:26:23 +0200 Subject: [PATCH] break: Move queue name into Handler --- backends/memory/memory_backend.go | 17 +++++----- backends/memory/memory_backend_test.go | 22 ++++++------ backends/postgres/postgres_backend.go | 34 +++++++++---------- backends/postgres/postgres_backend_test.go | 20 +++++------ backends/redis/redis_backend.go | 14 +++----- backends/redis/redis_backend_test.go | 22 ++++++------ .../add_job_with_custom_concurrency/main.go | 4 +-- examples/add_job_with_deadline/main.go | 4 +-- examples/add_job_with_timeout/main.go | 4 +-- examples/add_periodic_jobs/main.go | 2 +- examples/add_postgres_job/main.go | 4 +-- examples/add_redis_job/main.go | 3 +- examples/start_processing_jobs/main.go | 4 +-- handler/handler.go | 25 +++++++++++--- neoq.go | 4 +-- neoq_test.go | 10 +++--- 16 files changed, 102 insertions(+), 91 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index c82465c..adb88c3 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -89,7 +89,6 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, if job.Queue == "" { err = jobs.ErrNoQueueSpecified - return } @@ -121,14 +120,14 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, } // Start starts processing jobs with the specified queue and handler -func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) { +func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) { queueCapacity := h.QueueCapacity if queueCapacity == emptyCapacity { queueCapacity = defaultMemQueueCapacity } - m.handlers.Store(queue, h) - m.queues.Store(queue, make(chan *jobs.Job, queueCapacity)) + m.handlers.Store(h.Queue, h) + m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity)) ctx, cancel := context.WithCancel(ctx) @@ -136,7 +135,7 @@ func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) m.cancelFuncs = append(m.cancelFuncs, cancel) m.mu.Unlock() - err = m.start(ctx, queue) + err = m.start(ctx, h.Queue) if err != nil { return } @@ -164,8 +163,9 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H m.mu.Lock() m.cancelFuncs = append(m.cancelFuncs, cancel) m.mu.Unlock() + h.Queue = queue - err = m.Start(ctx, queue, h) + err = m.Start(ctx, h) if err != nil { return fmt.Errorf("error processing queue '%s': %w", queue, err) } @@ -203,13 +203,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { if ht, ok = m.handlers.Load(queue); !ok { err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) - m.logger.Error("error loading handler for queue", queue) + m.logger.Error("error loading handler for queue", "queue", queue) return } if qc, ok = m.queues.Load(queue); !ok { - err = fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue) - m.logger.Error("error loading channel for queue", queue) + m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue) return err } diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index b2fadeb..b2d1c94 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -52,12 +52,12 @@ func TestBasicJobProcessing(t *testing.T) { t.Fatal(err) } - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - if err := nq.Start(ctx, queue, h); err != nil { + if err := nq.Start(ctx, h); err != nil { t.Fatal(err) } @@ -126,12 +126,12 @@ func TestBackendConfiguration(t *testing.T) { t.Fatal(err) } - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { time.Sleep(100 * time.Millisecond) return }, handler.Concurrency(1), handler.MaxQueueCapacity(1)) - if err := nq.Start(ctx, queue, h); err != nil { + if err := nq.Start(ctx, h); err != nil { t.Fatal(err) } @@ -184,11 +184,11 @@ func TestFutureJobScheduling(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { return }) - if err := nq.Start(ctx, queue, h); err != nil { + if err := nq.Start(ctx, h); err != nil { t.Fatal(err) } @@ -232,7 +232,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) { done1 := make(chan bool) done2 := make(chan bool) - h1 := handler.New(func(ctx context.Context) (err error) { + h1 := handler.New(q1, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) if err != nil { @@ -247,7 +247,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) { return }) - h2 := handler.New(func(ctx context.Context) (err error) { + h2 := handler.New(q2, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) if err != nil { @@ -262,11 +262,11 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) { return }) - if err := nq.Start(ctx, q1, h1); err != nil { + if err := nq.Start(ctx, h1); err != nil { t.Fatal(err) } - if err := nq.Start(ctx, q2, h2); err != nil { + if err := nq.Start(ctx, h2); err != nil { t.Fatal(err) } @@ -339,7 +339,7 @@ func TestCron(t *testing.T) { defer nq.Shutdown(ctx) done := make(chan bool) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New("foobar", func(ctx context.Context) (err error) { done <- true return }) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e79b039..bdfc9fa 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -305,18 +305,18 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } // Start starts processing jobs with the specified queue and handler -func (p *PgBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) { +func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { ctx, cancel := context.WithCancel(ctx) - p.logger.Debug("starting job processing", "queue", queue) + p.logger.Debug("starting job processing", "queue", h.Queue) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) - p.handlers[queue] = h + p.handlers[h.Queue] = h p.mu.Unlock() - err = p.start(ctx, queue) + err = p.start(ctx, h) if err != nil { - p.logger.Error("unable to start processing queue", "queue", queue, "error", err) + p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err) return } return @@ -339,8 +339,9 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha } queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) - ctx, cancel := context.WithCancel(ctx) + h.Queue = queue + ctx, cancel := context.WithCancel(ctx) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) p.mu.Unlock() @@ -358,7 +359,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha return fmt.Errorf("error adding cron: %w", err) } - return p.Start(ctx, queue, h) + return p.Start(ctx, h) } // SetLogger sets this backend's logger @@ -505,24 +506,23 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { // start starts processing new, pending, and future jobs // nolint: cyclop -func (p *PgBackend) start(ctx context.Context, queue string) (err error) { - var h handler.Handler +func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { var ok bool - if h, ok = p.handlers[queue]; !ok { - return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) + if h, ok = p.handlers[h.Queue]; !ok { + return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue) } - listenJobChan, ready := p.listen(ctx, queue) // listen for 'new' jobs + listenJobChan, ready := p.listen(ctx, h.Queue) // listen for 'new' jobs defer close(ready) - pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup* + pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup* // wait for the listener to connect and be ready to listen <-ready // process all future jobs and retries - go func() { p.scheduleFutureJobs(ctx, queue) }() + go func() { p.scheduleFutureJobs(ctx, h.Queue) }() for i := 0; i < h.Concurrency; i++ { go func() { @@ -714,7 +714,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re go func(ctx context.Context) { conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("unable to acquire new listener connnection", err) + p.logger.Error("unable to acquire new listener connnection", "error", err) return } defer p.release(ctx, conn, queue) @@ -723,7 +723,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", err) + p.logger.Error("unable to configure listener connection", "error", err) return } @@ -737,7 +737,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re return } - p.logger.Error("failed to wait for notification", waitErr) + p.logger.Error("failed to wait for notification", "error", waitErr) continue } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 5eea8c3..3950083 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -70,19 +70,19 @@ func TestBasicJobProcessing(t *testing.T) { return } - ctx := context.TODO() + ctx := context.Background() nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) if err != nil { t.Fatal(err) } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -184,22 +184,22 @@ func TestBasicJobMultipleQueue(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - h2 := handler.New(func(_ context.Context) (err error) { + h2 := handler.New(queue2, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } - err = nq.Start(ctx, queue2, h2) + err = nq.Start(ctx, h2) if err != nil { t.Error(err) } @@ -264,7 +264,7 @@ func TestCron(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.NewPeriodic(func(ctx context.Context) (err error) { done <- true return }) @@ -318,14 +318,14 @@ func TestBasicJobProcessingWithErrors(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { err = errors.New("something bad happened") // nolint: goerr113 return }) nq.SetLogger(testutils.NewTestLogger(logsChan)) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index a2199fe..fc16212 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -180,11 +180,6 @@ func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption { // Enqueue queues jobs to be executed asynchronously func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { - if job.Queue == "" { - err = jobs.ErrNoQueueSpecified - return - } - err = jobs.FingerprintJob(job) if err != nil { return @@ -205,8 +200,8 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string } // Start starts processing jobs with the specified queue and handler -func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) { - b.mux.HandleFunc(queue, func(ctx context.Context, t *asynq.Task) (err error) { +func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { + b.mux.HandleFunc(h.Queue, func(ctx context.Context, t *asynq.Task) (err error) { taskID := t.ResultWriter().TaskID() var p map[string]any if err = json.Unmarshal(t.Payload(), &p); err != nil { @@ -226,7 +221,7 @@ func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) job := &jobs.Job{ CreatedAt: time.Now().UTC(), - Queue: queue, + Queue: h.Queue, Payload: p, Deadline: &ti.Deadline, RunAfter: ti.NextProcessAt, @@ -259,8 +254,9 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler } queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) + h.Queue = queue - err = b.Start(ctx, queue, h) + err = b.Start(ctx, h) if err != nil { return } diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index 39c91b5..eb487a8 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -86,12 +86,12 @@ func TestBasicJobProcessing(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -143,22 +143,22 @@ func TestBasicJobMultipleQueue(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - h2 := handler.New(func(_ context.Context) (err error) { + h2 := handler.New(queue2, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } - err = nq.Start(ctx, queue2, h2) + err = nq.Start(ctx, h2) if err != nil { t.Error(err) } @@ -221,7 +221,7 @@ func TestStartCron(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.NewPeriodic(func(_ context.Context) (err error) { done <- true return }) @@ -268,7 +268,7 @@ func TestJobProcessingWithOptions(t *testing.T) { nq.SetLogger(testutils.NewTestLogger(logsChan)) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { time.Sleep(50 * time.Millisecond) return }) @@ -277,7 +277,7 @@ func TestJobProcessingWithOptions(t *testing.T) { handler.Concurrency(1), ) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -334,13 +334,13 @@ func TestJobProcessingWithJobDeadline(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { time.Sleep(50 * time.Millisecond) done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } diff --git a/examples/add_job_with_custom_concurrency/main.go b/examples/add_job_with_custom_concurrency/main.go index b41554c..05c8dc6 100644 --- a/examples/add_job_with_custom_concurrency/main.go +++ b/examples/add_job_with_custom_concurrency/main.go @@ -25,7 +25,7 @@ func main() { // Concurrency and other options may be set on handlers both during creation (Option 1), or after the fact (Option 2) // Option 1: add options when creating the handler - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) @@ -36,7 +36,7 @@ func main() { // Option 2: Set options after the handler is created h.WithOptions(handler.Concurrency(8)) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_job_with_deadline/main.go b/examples/add_job_with_deadline/main.go index 783b076..4c23850 100644 --- a/examples/add_job_with_deadline/main.go +++ b/examples/add_job_with_deadline/main.go @@ -25,12 +25,12 @@ func main() { // this is probably not a pattern you want to use in production jobs and you see it here only for testing reasons done := make(chan bool) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { <-done return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_job_with_timeout/main.go b/examples/add_job_with_timeout/main.go index 425309e..4733209 100644 --- a/examples/add_job_with_timeout/main.go +++ b/examples/add_job_with_timeout/main.go @@ -28,7 +28,7 @@ func main() { // this is probably not a pattern you want to use in production jobs and you see it here only for testing reasons done := make(chan bool) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job time.Sleep(1 * time.Second) j, err = jobs.FromContext(ctx) @@ -40,7 +40,7 @@ func main() { // this 10ms timeout will cause our job that sleeps for 1s to fail h.WithOptions(handler.JobTimeout(10 * time.Millisecond)) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_periodic_jobs/main.go b/examples/add_periodic_jobs/main.go index a95774f..a40d6bc 100644 --- a/examples/add_periodic_jobs/main.go +++ b/examples/add_periodic_jobs/main.go @@ -18,7 +18,7 @@ func main() { } // run a job periodically - h := handler.New(func(ctx context.Context) (err error) { + h := handler.NewPeriodic(func(_ context.Context) (err error) { log.Println("running periodic job") return }) diff --git a/examples/add_postgres_job/main.go b/examples/add_postgres_job/main.go index e0078e9..cd2810b 100644 --- a/examples/add_postgres_job/main.go +++ b/examples/add_postgres_job/main.go @@ -24,7 +24,7 @@ func main() { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job time.Sleep(1 * time.Second) j, err = jobs.FromContext(ctx) @@ -33,7 +33,7 @@ func main() { return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_redis_job/main.go b/examples/add_redis_job/main.go index 889b66e..08eae44 100644 --- a/examples/add_redis_job/main.go +++ b/examples/add_redis_job/main.go @@ -11,7 +11,6 @@ import ( ) func main() { - done := make(chan bool) ctx := context.Background() nq, _ := neoq.New(ctx, @@ -20,7 +19,7 @@ func main() { redis.WithPassword(""), ) - nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { + nq.Start(ctx, handler.New("hello_world", func(ctx context.Context) (err error) { j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true diff --git a/examples/start_processing_jobs/main.go b/examples/start_processing_jobs/main.go index b192c7f..6d0e57c 100644 --- a/examples/start_processing_jobs/main.go +++ b/examples/start_processing_jobs/main.go @@ -20,14 +20,14 @@ func main() { log.Fatalf("error initializing postgres backend: %v", err) } - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error processing queue", err) } diff --git a/handler/handler.go b/handler/handler.go index d8c3bda..9e76ab2 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -30,6 +30,7 @@ type Handler struct { Concurrency int JobTimeout time.Duration QueueCapacity int64 + Queue string } // Option is function that sets optional configuration for Handlers @@ -67,10 +68,19 @@ func MaxQueueCapacity(capacity int64) Option { } } -// New creates a new queue handler -func New(f Func, opts ...Option) (h Handler) { +// Queue configures the name of the queue that the handler runs on +func Queue(queue string) Option { + return func(h *Handler) { + h.Queue = queue + } +} + +// New creates new queue handlers for specific queues. This function is to be usued to create new Handlers for +// non-periodic jobs (most jobs). Use [NewPeriodic] to initialize handlers for periodic jobs. +func New(queue string, f Func, opts ...Option) (h Handler) { h = Handler{ Handle: f, + Queue: queue, } h.WithOptions(opts...) @@ -88,13 +98,20 @@ func New(f Func, opts ...Option) (h Handler) { return } +// NewPeriodic creates new queue handlers for periodic jobs. Use [New] to initialize handlers for non-periodic jobs. +func NewPeriodic(f Func, opts ...Option) (h Handler) { + h = New("", f, opts...) + return +} + // Exec executes handler functions with a concrete timeout func Exec(ctx context.Context, handler Handler) (err error) { timeoutCtx, cancel := context.WithTimeout(ctx, handler.JobTimeout) defer cancel() - var errCh = make(chan error, 1) - var done = make(chan bool) + errCh := make(chan error, 1) + done := make(chan bool) + go func(ctx context.Context) { defer func() { if x := recover(); x != nil { diff --git a/neoq.go b/neoq.go index 2a30d80..b79397c 100644 --- a/neoq.go +++ b/neoq.go @@ -63,8 +63,8 @@ type Neoq interface { // Enqueue queues jobs to be executed asynchronously Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) - // Start starts processing jobs with the specified queue and handler - Start(ctx context.Context, queue string, h handler.Handler) (err error) + // Start starts processing jobs on the queue specified in the Handler + Start(ctx context.Context, h handler.Handler) (err error) // StartCron starts processing jobs with the specified cron schedule and handler // diff --git a/neoq_test.go b/neoq_test.go index 53b3a54..71e80f3 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -102,7 +102,7 @@ func TestStart(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { done <- true return }) @@ -112,7 +112,7 @@ func TestStart(t *testing.T) { ) // process jobs on the test queue - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -165,7 +165,7 @@ func TestStartCron(t *testing.T) { defer nq.Shutdown(ctx) done := make(chan bool) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.NewPeriodic(func(ctx context.Context) (err error) { done <- true return }) @@ -208,14 +208,14 @@ func TestSetLogger(t *testing.T) { nq.SetLogger(testutils.NewTestLogger(logsChan)) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { err = errTrigger return }) if err != nil { t.Error(err) } - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) }