Skip to content

Commit

Permalink
break: Move queue name into Handler
Browse files Browse the repository at this point in the history
This goal of this change is to simplify the API. Handlers and queues
have a 1:1 mapping, so functions that receive both a `queue` and a
`handler` take two parameters when they could take only one.
  • Loading branch information
acaloiaro committed Sep 15, 2023
1 parent 18b811f commit d198f3e
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 91 deletions.
17 changes: 8 additions & 9 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified

return
}

Expand Down Expand Up @@ -121,22 +120,22 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
}

// Start starts processing jobs with the specified queue and handler
func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) {
queueCapacity := h.QueueCapacity
if queueCapacity == emptyCapacity {
queueCapacity = defaultMemQueueCapacity
}

m.handlers.Store(queue, h)
m.queues.Store(queue, make(chan *jobs.Job, queueCapacity))
m.handlers.Store(h.Queue, h)
m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity))

ctx, cancel := context.WithCancel(ctx)

m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()

err = m.start(ctx, queue)
err = m.start(ctx, h.Queue)
if err != nil {
return
}
Expand Down Expand Up @@ -164,8 +163,9 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()
h.Queue = queue

err = m.Start(ctx, queue, h)
err = m.Start(ctx, h)
if err != nil {
return fmt.Errorf("error processing queue '%s': %w", queue, err)
}
Expand Down Expand Up @@ -203,13 +203,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

if ht, ok = m.handlers.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
m.logger.Error("error loading handler for queue", queue)
m.logger.Error("error loading handler for queue", "queue", queue)
return
}

if qc, ok = m.queues.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue)
m.logger.Error("error loading channel for queue", queue)
m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue)
return err
}

Expand Down
22 changes: 11 additions & 11 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestBasicJobProcessing(t *testing.T) {
t.Fatal(err)
}

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -126,12 +126,12 @@ func TestBackendConfiguration(t *testing.T) {
t.Fatal(err)
}

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
time.Sleep(100 * time.Millisecond)
return
}, handler.Concurrency(1), handler.MaxQueueCapacity(1))

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -184,11 +184,11 @@ func TestFutureJobScheduling(t *testing.T) {
}
defer nq.Shutdown(ctx)

h := handler.New(func(ctx context.Context) (err error) {
h := handler.New(queue, func(ctx context.Context) (err error) {
return
})

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -232,7 +232,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
done1 := make(chan bool)
done2 := make(chan bool)

h1 := handler.New(func(ctx context.Context) (err error) {
h1 := handler.New(q1, func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
Expand All @@ -247,7 +247,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
return
})

h2 := handler.New(func(ctx context.Context) (err error) {
h2 := handler.New(q2, func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
Expand All @@ -262,11 +262,11 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
return
})

if err := nq.Start(ctx, q1, h1); err != nil {
if err := nq.Start(ctx, h1); err != nil {
t.Fatal(err)
}

if err := nq.Start(ctx, q2, h2); err != nil {
if err := nq.Start(ctx, h2); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -339,7 +339,7 @@ func TestCron(t *testing.T) {
defer nq.Shutdown(ctx)

done := make(chan bool)
h := handler.New(func(ctx context.Context) (err error) {
h := handler.New("foobar", func(ctx context.Context) (err error) {
done <- true
return
})
Expand Down
34 changes: 17 additions & 17 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,18 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
}

// Start starts processing jobs with the specified queue and handler
func (p *PgBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
ctx, cancel := context.WithCancel(ctx)

p.logger.Debug("starting job processing", "queue", queue)
p.logger.Debug("starting job processing", "queue", h.Queue)
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.handlers[queue] = h
p.handlers[h.Queue] = h
p.mu.Unlock()

err = p.start(ctx, queue)
err = p.start(ctx, h)
if err != nil {
p.logger.Error("unable to start processing queue", "queue", queue, "error", err)
p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err)
return
}
return
Expand All @@ -339,8 +339,9 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
}

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
ctx, cancel := context.WithCancel(ctx)
h.Queue = queue

ctx, cancel := context.WithCancel(ctx)
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.mu.Unlock()
Expand All @@ -358,7 +359,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
return fmt.Errorf("error adding cron: %w", err)
}

return p.Start(ctx, queue, h)
return p.Start(ctx, h)
}

// SetLogger sets this backend's logger
Expand Down Expand Up @@ -505,24 +506,23 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {

// start starts processing new, pending, and future jobs
// nolint: cyclop
func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
var h handler.Handler
func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
var ok bool

if h, ok = p.handlers[queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
if h, ok = p.handlers[h.Queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

listenJobChan, ready := p.listen(ctx, queue) // listen for 'new' jobs
listenJobChan, ready := p.listen(ctx, h.Queue) // listen for 'new' jobs
defer close(ready)

pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup*
pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*

// wait for the listener to connect and be ready to listen
<-ready

// process all future jobs and retries
go func() { p.scheduleFutureJobs(ctx, queue) }()
go func() { p.scheduleFutureJobs(ctx, h.Queue) }()

for i := 0; i < h.Concurrency; i++ {
go func() {
Expand Down Expand Up @@ -714,7 +714,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
go func(ctx context.Context) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new listener connnection", err)
p.logger.Error("unable to acquire new listener connnection", "error", err)
return
}
defer p.release(ctx, conn, queue)
Expand All @@ -723,7 +723,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
_, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", err)
p.logger.Error("unable to configure listener connection", "error", err)
return
}

Expand All @@ -737,7 +737,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
return
}

p.logger.Error("failed to wait for notification", waitErr)
p.logger.Error("failed to wait for notification", "error", waitErr)
continue
}

Expand Down
20 changes: 10 additions & 10 deletions backends/postgres/postgres_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,19 @@ func TestBasicJobProcessing(t *testing.T) {
return
}

ctx := context.TODO()
ctx := context.Background()
nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString))
if err != nil {
t.Fatal(err)
}
defer nq.Shutdown(ctx)

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

err = nq.Start(ctx, queue, h)
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -184,22 +184,22 @@ func TestBasicJobMultipleQueue(t *testing.T) {
}
defer nq.Shutdown(ctx)

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

h2 := handler.New(func(_ context.Context) (err error) {
h2 := handler.New(queue2, func(_ context.Context) (err error) {
done <- true
return
})

err = nq.Start(ctx, queue, h)
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}

err = nq.Start(ctx, queue2, h2)
err = nq.Start(ctx, h2)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestCron(t *testing.T) {
}
defer nq.Shutdown(ctx)

h := handler.New(func(ctx context.Context) (err error) {
h := handler.NewPeriodic(func(ctx context.Context) (err error) {
done <- true
return
})
Expand Down Expand Up @@ -318,14 +318,14 @@ func TestBasicJobProcessingWithErrors(t *testing.T) {
}
defer nq.Shutdown(ctx)

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
err = errors.New("something bad happened") // nolint: goerr113
return
})

nq.SetLogger(testutils.NewTestLogger(logsChan))

err = nq.Start(ctx, queue, h)
err = nq.Start(ctx, h)
if err != nil {
t.Error(err)
}
Expand Down
14 changes: 5 additions & 9 deletions backends/redis/redis_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption {

// Enqueue queues jobs to be executed asynchronously
func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) {
if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
}

err = jobs.FingerprintJob(job)
if err != nil {
return
Expand All @@ -205,8 +200,8 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string
}

// Start starts processing jobs with the specified queue and handler
func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) {
b.mux.HandleFunc(queue, func(ctx context.Context, t *asynq.Task) (err error) {
func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) {
b.mux.HandleFunc(h.Queue, func(ctx context.Context, t *asynq.Task) (err error) {
taskID := t.ResultWriter().TaskID()
var p map[string]any
if err = json.Unmarshal(t.Payload(), &p); err != nil {
Expand All @@ -226,7 +221,7 @@ func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler)

job := &jobs.Job{
CreatedAt: time.Now().UTC(),
Queue: queue,
Queue: h.Queue,
Payload: p,
Deadline: &ti.Deadline,
RunAfter: ti.NextProcessAt,
Expand Down Expand Up @@ -259,8 +254,9 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler
}

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
h.Queue = queue

err = b.Start(ctx, queue, h)
err = b.Start(ctx, h)
if err != nil {
return
}
Expand Down
Loading

0 comments on commit d198f3e

Please sign in to comment.