From cc476d060ca7353d22e2f1cf81d6eabb35168524 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Fri, 15 Sep 2023 16:26:23 +0200 Subject: [PATCH 1/7] break: Move queue name into Handler This goal of this change is to simplify the API. Handlers and queues have a 1:1 mapping, so functions that receive both a `queue` and a `handler` take two parameters when they could take only one. --- README.md | 20 +++++------ backends/memory/memory_backend.go | 17 +++++----- backends/memory/memory_backend_test.go | 22 ++++++------ backends/postgres/postgres_backend.go | 34 +++++++++---------- backends/postgres/postgres_backend_test.go | 20 +++++------ backends/redis/redis_backend.go | 14 +++----- backends/redis/redis_backend_test.go | 22 ++++++------ .../add_job_with_custom_concurrency/main.go | 4 +-- examples/add_job_with_deadline/main.go | 4 +-- examples/add_job_with_timeout/main.go | 4 +-- examples/add_periodic_jobs/main.go | 2 +- examples/add_postgres_job/main.go | 4 +-- examples/add_redis_job/main.go | 3 +- examples/start_processing_jobs/main.go | 4 +-- handler/handler.go | 25 +++++++++++--- neoq.go | 4 +-- neoq_test.go | 10 +++--- 17 files changed, 112 insertions(+), 101 deletions(-) diff --git a/README.md b/README.md index 5b01b5e..b35f836 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter. ```go ctx := context.Background() nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend)) -nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { +nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) { j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return @@ -57,14 +57,14 @@ nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { Enqueuing adds jobs to the specified queue to be processed asynchronously. -**Example**: Add a "Hello World" job to the `hello_world` queue using the default in-memory backend. +**Example**: Add a "Hello World" job to the `greetings` queue using the default in-memory backend. ```go ctx := context.Background() nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend)) nq.Enqueue(ctx, &jobs.Job{ - Queue: "hello_world", - Payload: map[string]interface{}{ + Queue: "greetings", + Payload: map[string]any{ "message": "hello world", }, }) @@ -72,7 +72,7 @@ nq.Enqueue(ctx, &jobs.Job{ ## Redis -**Example**: Process jobs on the "hello_world" queue and add a job to it using the redis backend +**Example**: Process jobs on the "greetings" queue and add a job to it using the redis backend ```go ctx := context.Background() @@ -81,14 +81,14 @@ nq, _ := neoq.New(ctx, redis.WithAddr("localhost:6379"), redis.WithPassword("")) -nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { +nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) { j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return })) nq.Enqueue(ctx, &jobs.Job{ - Queue: "hello_world", + Queue: "greetings", Payload: map[string]interface{}{ "message": "hello world", }, @@ -97,7 +97,7 @@ nq.Enqueue(ctx, &jobs.Job{ ## Postgres -**Example**: Process jobs on the "hello_world" queue and add a job to it using the postgres backend +**Example**: Process jobs on the "greetings" queue and add a job to it using the postgres backend ```go ctx := context.Background() @@ -106,14 +106,14 @@ nq, _ := neoq.New(ctx, postgres.WithConnectionString("postgres://postgres:postgres@127.0.0.1:5432/neoq"), ) -nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { +nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) { j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return })) nq.Enqueue(ctx, &jobs.Job{ - Queue: "hello_world", + Queue: "greetings", Payload: map[string]interface{}{ "message": "hello world", }, diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index c82465c..adb88c3 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -89,7 +89,6 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, if job.Queue == "" { err = jobs.ErrNoQueueSpecified - return } @@ -121,14 +120,14 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, } // Start starts processing jobs with the specified queue and handler -func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) { +func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) { queueCapacity := h.QueueCapacity if queueCapacity == emptyCapacity { queueCapacity = defaultMemQueueCapacity } - m.handlers.Store(queue, h) - m.queues.Store(queue, make(chan *jobs.Job, queueCapacity)) + m.handlers.Store(h.Queue, h) + m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity)) ctx, cancel := context.WithCancel(ctx) @@ -136,7 +135,7 @@ func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) m.cancelFuncs = append(m.cancelFuncs, cancel) m.mu.Unlock() - err = m.start(ctx, queue) + err = m.start(ctx, h.Queue) if err != nil { return } @@ -164,8 +163,9 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H m.mu.Lock() m.cancelFuncs = append(m.cancelFuncs, cancel) m.mu.Unlock() + h.Queue = queue - err = m.Start(ctx, queue, h) + err = m.Start(ctx, h) if err != nil { return fmt.Errorf("error processing queue '%s': %w", queue, err) } @@ -203,13 +203,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { if ht, ok = m.handlers.Load(queue); !ok { err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) - m.logger.Error("error loading handler for queue", queue) + m.logger.Error("error loading handler for queue", "queue", queue) return } if qc, ok = m.queues.Load(queue); !ok { - err = fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue) - m.logger.Error("error loading channel for queue", queue) + m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue) return err } diff --git a/backends/memory/memory_backend_test.go b/backends/memory/memory_backend_test.go index b2fadeb..b2d1c94 100644 --- a/backends/memory/memory_backend_test.go +++ b/backends/memory/memory_backend_test.go @@ -52,12 +52,12 @@ func TestBasicJobProcessing(t *testing.T) { t.Fatal(err) } - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - if err := nq.Start(ctx, queue, h); err != nil { + if err := nq.Start(ctx, h); err != nil { t.Fatal(err) } @@ -126,12 +126,12 @@ func TestBackendConfiguration(t *testing.T) { t.Fatal(err) } - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { time.Sleep(100 * time.Millisecond) return }, handler.Concurrency(1), handler.MaxQueueCapacity(1)) - if err := nq.Start(ctx, queue, h); err != nil { + if err := nq.Start(ctx, h); err != nil { t.Fatal(err) } @@ -184,11 +184,11 @@ func TestFutureJobScheduling(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { return }) - if err := nq.Start(ctx, queue, h); err != nil { + if err := nq.Start(ctx, h); err != nil { t.Fatal(err) } @@ -232,7 +232,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) { done1 := make(chan bool) done2 := make(chan bool) - h1 := handler.New(func(ctx context.Context) (err error) { + h1 := handler.New(q1, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) if err != nil { @@ -247,7 +247,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) { return }) - h2 := handler.New(func(ctx context.Context) (err error) { + h2 := handler.New(q2, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) if err != nil { @@ -262,11 +262,11 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) { return }) - if err := nq.Start(ctx, q1, h1); err != nil { + if err := nq.Start(ctx, h1); err != nil { t.Fatal(err) } - if err := nq.Start(ctx, q2, h2); err != nil { + if err := nq.Start(ctx, h2); err != nil { t.Fatal(err) } @@ -339,7 +339,7 @@ func TestCron(t *testing.T) { defer nq.Shutdown(ctx) done := make(chan bool) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New("foobar", func(ctx context.Context) (err error) { done <- true return }) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e79b039..bdfc9fa 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -305,18 +305,18 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } // Start starts processing jobs with the specified queue and handler -func (p *PgBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) { +func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { ctx, cancel := context.WithCancel(ctx) - p.logger.Debug("starting job processing", "queue", queue) + p.logger.Debug("starting job processing", "queue", h.Queue) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) - p.handlers[queue] = h + p.handlers[h.Queue] = h p.mu.Unlock() - err = p.start(ctx, queue) + err = p.start(ctx, h) if err != nil { - p.logger.Error("unable to start processing queue", "queue", queue, "error", err) + p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err) return } return @@ -339,8 +339,9 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha } queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) - ctx, cancel := context.WithCancel(ctx) + h.Queue = queue + ctx, cancel := context.WithCancel(ctx) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) p.mu.Unlock() @@ -358,7 +359,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha return fmt.Errorf("error adding cron: %w", err) } - return p.Start(ctx, queue, h) + return p.Start(ctx, h) } // SetLogger sets this backend's logger @@ -505,24 +506,23 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { // start starts processing new, pending, and future jobs // nolint: cyclop -func (p *PgBackend) start(ctx context.Context, queue string) (err error) { - var h handler.Handler +func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { var ok bool - if h, ok = p.handlers[queue]; !ok { - return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) + if h, ok = p.handlers[h.Queue]; !ok { + return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue) } - listenJobChan, ready := p.listen(ctx, queue) // listen for 'new' jobs + listenJobChan, ready := p.listen(ctx, h.Queue) // listen for 'new' jobs defer close(ready) - pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup* + pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup* // wait for the listener to connect and be ready to listen <-ready // process all future jobs and retries - go func() { p.scheduleFutureJobs(ctx, queue) }() + go func() { p.scheduleFutureJobs(ctx, h.Queue) }() for i := 0; i < h.Concurrency; i++ { go func() { @@ -714,7 +714,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re go func(ctx context.Context) { conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("unable to acquire new listener connnection", err) + p.logger.Error("unable to acquire new listener connnection", "error", err) return } defer p.release(ctx, conn, queue) @@ -723,7 +723,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", err) + p.logger.Error("unable to configure listener connection", "error", err) return } @@ -737,7 +737,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re return } - p.logger.Error("failed to wait for notification", waitErr) + p.logger.Error("failed to wait for notification", "error", waitErr) continue } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 5eea8c3..3950083 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -70,19 +70,19 @@ func TestBasicJobProcessing(t *testing.T) { return } - ctx := context.TODO() + ctx := context.Background() nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) if err != nil { t.Fatal(err) } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -184,22 +184,22 @@ func TestBasicJobMultipleQueue(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - h2 := handler.New(func(_ context.Context) (err error) { + h2 := handler.New(queue2, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } - err = nq.Start(ctx, queue2, h2) + err = nq.Start(ctx, h2) if err != nil { t.Error(err) } @@ -264,7 +264,7 @@ func TestCron(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.NewPeriodic(func(ctx context.Context) (err error) { done <- true return }) @@ -318,14 +318,14 @@ func TestBasicJobProcessingWithErrors(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { err = errors.New("something bad happened") // nolint: goerr113 return }) nq.SetLogger(testutils.NewTestLogger(logsChan)) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index a2199fe..fc16212 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -180,11 +180,6 @@ func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption { // Enqueue queues jobs to be executed asynchronously func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { - if job.Queue == "" { - err = jobs.ErrNoQueueSpecified - return - } - err = jobs.FingerprintJob(job) if err != nil { return @@ -205,8 +200,8 @@ func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string } // Start starts processing jobs with the specified queue and handler -func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) (err error) { - b.mux.HandleFunc(queue, func(ctx context.Context, t *asynq.Task) (err error) { +func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { + b.mux.HandleFunc(h.Queue, func(ctx context.Context, t *asynq.Task) (err error) { taskID := t.ResultWriter().TaskID() var p map[string]any if err = json.Unmarshal(t.Payload(), &p); err != nil { @@ -226,7 +221,7 @@ func (b *RedisBackend) Start(_ context.Context, queue string, h handler.Handler) job := &jobs.Job{ CreatedAt: time.Now().UTC(), - Queue: queue, + Queue: h.Queue, Payload: p, Deadline: &ti.Deadline, RunAfter: ti.NextProcessAt, @@ -259,8 +254,9 @@ func (b *RedisBackend) StartCron(ctx context.Context, cronSpec string, h handler } queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr)) + h.Queue = queue - err = b.Start(ctx, queue, h) + err = b.Start(ctx, h) if err != nil { return } diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index 39c91b5..eb487a8 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -86,12 +86,12 @@ func TestBasicJobProcessing(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -143,22 +143,22 @@ func TestBasicJobMultipleQueue(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { done <- true return }) - h2 := handler.New(func(_ context.Context) (err error) { + h2 := handler.New(queue2, func(_ context.Context) (err error) { done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } - err = nq.Start(ctx, queue2, h2) + err = nq.Start(ctx, h2) if err != nil { t.Error(err) } @@ -221,7 +221,7 @@ func TestStartCron(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.NewPeriodic(func(_ context.Context) (err error) { done <- true return }) @@ -268,7 +268,7 @@ func TestJobProcessingWithOptions(t *testing.T) { nq.SetLogger(testutils.NewTestLogger(logsChan)) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { time.Sleep(50 * time.Millisecond) return }) @@ -277,7 +277,7 @@ func TestJobProcessingWithOptions(t *testing.T) { handler.Concurrency(1), ) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -334,13 +334,13 @@ func TestJobProcessingWithJobDeadline(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { time.Sleep(50 * time.Millisecond) done <- true return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } diff --git a/examples/add_job_with_custom_concurrency/main.go b/examples/add_job_with_custom_concurrency/main.go index b41554c..05c8dc6 100644 --- a/examples/add_job_with_custom_concurrency/main.go +++ b/examples/add_job_with_custom_concurrency/main.go @@ -25,7 +25,7 @@ func main() { // Concurrency and other options may be set on handlers both during creation (Option 1), or after the fact (Option 2) // Option 1: add options when creating the handler - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) @@ -36,7 +36,7 @@ func main() { // Option 2: Set options after the handler is created h.WithOptions(handler.Concurrency(8)) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_job_with_deadline/main.go b/examples/add_job_with_deadline/main.go index 783b076..4c23850 100644 --- a/examples/add_job_with_deadline/main.go +++ b/examples/add_job_with_deadline/main.go @@ -25,12 +25,12 @@ func main() { // this is probably not a pattern you want to use in production jobs and you see it here only for testing reasons done := make(chan bool) - h := handler.New(func(_ context.Context) (err error) { + h := handler.New(queue, func(_ context.Context) (err error) { <-done return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_job_with_timeout/main.go b/examples/add_job_with_timeout/main.go index 425309e..4733209 100644 --- a/examples/add_job_with_timeout/main.go +++ b/examples/add_job_with_timeout/main.go @@ -28,7 +28,7 @@ func main() { // this is probably not a pattern you want to use in production jobs and you see it here only for testing reasons done := make(chan bool) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job time.Sleep(1 * time.Second) j, err = jobs.FromContext(ctx) @@ -40,7 +40,7 @@ func main() { // this 10ms timeout will cause our job that sleeps for 1s to fail h.WithOptions(handler.JobTimeout(10 * time.Millisecond)) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_periodic_jobs/main.go b/examples/add_periodic_jobs/main.go index a95774f..a40d6bc 100644 --- a/examples/add_periodic_jobs/main.go +++ b/examples/add_periodic_jobs/main.go @@ -18,7 +18,7 @@ func main() { } // run a job periodically - h := handler.New(func(ctx context.Context) (err error) { + h := handler.NewPeriodic(func(_ context.Context) (err error) { log.Println("running periodic job") return }) diff --git a/examples/add_postgres_job/main.go b/examples/add_postgres_job/main.go index e0078e9..cd2810b 100644 --- a/examples/add_postgres_job/main.go +++ b/examples/add_postgres_job/main.go @@ -24,7 +24,7 @@ func main() { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job time.Sleep(1 * time.Second) j, err = jobs.FromContext(ctx) @@ -33,7 +33,7 @@ func main() { return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error listening to queue", err) } diff --git a/examples/add_redis_job/main.go b/examples/add_redis_job/main.go index 889b66e..08eae44 100644 --- a/examples/add_redis_job/main.go +++ b/examples/add_redis_job/main.go @@ -11,7 +11,6 @@ import ( ) func main() { - done := make(chan bool) ctx := context.Background() nq, _ := neoq.New(ctx, @@ -20,7 +19,7 @@ func main() { redis.WithPassword(""), ) - nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) { + nq.Start(ctx, handler.New("hello_world", func(ctx context.Context) (err error) { j, _ := jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) done <- true diff --git a/examples/start_processing_jobs/main.go b/examples/start_processing_jobs/main.go index b192c7f..6d0e57c 100644 --- a/examples/start_processing_jobs/main.go +++ b/examples/start_processing_jobs/main.go @@ -20,14 +20,14 @@ func main() { log.Fatalf("error initializing postgres backend: %v", err) } - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { var j *jobs.Job j, err = jobs.FromContext(ctx) log.Println("got job id:", j.ID, "messsage:", j.Payload["message"]) return }) - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { log.Println("error processing queue", err) } diff --git a/handler/handler.go b/handler/handler.go index d8c3bda..9e76ab2 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -30,6 +30,7 @@ type Handler struct { Concurrency int JobTimeout time.Duration QueueCapacity int64 + Queue string } // Option is function that sets optional configuration for Handlers @@ -67,10 +68,19 @@ func MaxQueueCapacity(capacity int64) Option { } } -// New creates a new queue handler -func New(f Func, opts ...Option) (h Handler) { +// Queue configures the name of the queue that the handler runs on +func Queue(queue string) Option { + return func(h *Handler) { + h.Queue = queue + } +} + +// New creates new queue handlers for specific queues. This function is to be usued to create new Handlers for +// non-periodic jobs (most jobs). Use [NewPeriodic] to initialize handlers for periodic jobs. +func New(queue string, f Func, opts ...Option) (h Handler) { h = Handler{ Handle: f, + Queue: queue, } h.WithOptions(opts...) @@ -88,13 +98,20 @@ func New(f Func, opts ...Option) (h Handler) { return } +// NewPeriodic creates new queue handlers for periodic jobs. Use [New] to initialize handlers for non-periodic jobs. +func NewPeriodic(f Func, opts ...Option) (h Handler) { + h = New("", f, opts...) + return +} + // Exec executes handler functions with a concrete timeout func Exec(ctx context.Context, handler Handler) (err error) { timeoutCtx, cancel := context.WithTimeout(ctx, handler.JobTimeout) defer cancel() - var errCh = make(chan error, 1) - var done = make(chan bool) + errCh := make(chan error, 1) + done := make(chan bool) + go func(ctx context.Context) { defer func() { if x := recover(); x != nil { diff --git a/neoq.go b/neoq.go index 2a30d80..b79397c 100644 --- a/neoq.go +++ b/neoq.go @@ -63,8 +63,8 @@ type Neoq interface { // Enqueue queues jobs to be executed asynchronously Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) - // Start starts processing jobs with the specified queue and handler - Start(ctx context.Context, queue string, h handler.Handler) (err error) + // Start starts processing jobs on the queue specified in the Handler + Start(ctx context.Context, h handler.Handler) (err error) // StartCron starts processing jobs with the specified cron schedule and handler // diff --git a/neoq_test.go b/neoq_test.go index 53b3a54..71e80f3 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -102,7 +102,7 @@ func TestStart(t *testing.T) { } defer nq.Shutdown(ctx) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { done <- true return }) @@ -112,7 +112,7 @@ func TestStart(t *testing.T) { ) // process jobs on the test queue - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } @@ -165,7 +165,7 @@ func TestStartCron(t *testing.T) { defer nq.Shutdown(ctx) done := make(chan bool) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.NewPeriodic(func(ctx context.Context) (err error) { done <- true return }) @@ -208,14 +208,14 @@ func TestSetLogger(t *testing.T) { nq.SetLogger(testutils.NewTestLogger(logsChan)) - h := handler.New(func(ctx context.Context) (err error) { + h := handler.New(queue, func(ctx context.Context) (err error) { err = errTrigger return }) if err != nil { t.Error(err) } - err = nq.Start(ctx, queue, h) + err = nq.Start(ctx, h) if err != nil { t.Error(err) } From 88dac639a3fc1c38f6a0b4778064ca7dca8eb372 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 16 Sep 2023 16:30:30 +0200 Subject: [PATCH 2/7] feat: Disallow adding items to the unnamed queue --- backends/memory/memory_backend.go | 5 +++++ backends/postgres/postgres_backend.go | 10 +++++----- backends/redis/redis_backend.go | 5 +++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index adb88c3..e7f1ebf 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -72,6 +72,11 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, var qc any var ok bool + if job.Queue == "" { + err = jobs.ErrNoQueueSpecified + return + } + m.logger.Debug("adding a new job", "queue", job.Queue) if qc, ok = m.queues.Load(job.Queue); !ok { diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index bdfc9fa..d846c4c 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -233,6 +233,11 @@ func (p *PgBackend) initializeDB() (err error) { // Enqueue adds jobs to the specified queue func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { + if job.Queue == "" { + err = jobs.ErrNoQueueSpecified + return + } + p.logger.Debug("enqueueing job payload", slog.Any("job_payload", job.Payload)) ctx, cancel := context.WithCancel(ctx) p.mu.Lock() @@ -266,11 +271,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e job.RunAfter = now } - if job.Queue == "" { - err = jobs.ErrNoQueueSpecified - return - } - jobID, err = p.enqueueJob(ctx, tx, job) if err != nil { var pgErr *pgconn.PgError diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index fc16212..b7b253c 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -180,6 +180,11 @@ func WithShutdownTimeout(timeout time.Duration) neoq.ConfigOption { // Enqueue queues jobs to be executed asynchronously func (b *RedisBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, err error) { + if job.Queue == "" { + err = jobs.ErrNoQueueSpecified + return + } + err = jobs.FingerprintJob(job) if err != nil { return From 94ecb6367fcf40a91aa35e5bd537f1e6248f98cc Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 16 Sep 2023 16:31:05 +0200 Subject: [PATCH 3/7] feat: ensure memory backend exits gracefully during Shutdown() --- backends/memory/memory_backend.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index e7f1ebf..10d749b 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -226,13 +226,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { go func() { var err error var job *jobs.Job - for { select { case job = <-queueChan: err = m.handleJob(ctx, job, h) case <-ctx.Done(): - return + err = ctx.Err() } if err != nil { From 568ab86eb198efc0e6e8d994ce7d0ec8db5306bd Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 16 Sep 2023 16:53:27 +0200 Subject: [PATCH 4/7] fix: Accumulation of cancel() funcs enqueueing on pg backend `cancelFuncs` in the pg backend was intended for gracefully shutting down the backend with Shutdown(). Adding a new cancel function every time a job is added to a queue isn't of any use, because there is nothing to "cancel" by cancelling the context unless a job is in progress. This was effectively a memory leak. --- backends/postgres/postgres_backend.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index d846c4c..9e4aceb 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -239,10 +239,6 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } p.logger.Debug("enqueueing job payload", slog.Any("job_payload", job.Payload)) - ctx, cancel := context.WithCancel(ctx) - p.mu.Lock() - p.cancelFuncs = append(p.cancelFuncs, cancel) - p.mu.Unlock() p.logger.Debug("acquiring new connection from connection pool") conn, err := p.pool.Acquire(ctx) From 6e9aa8da403d4df4ce7d16c0c13889c515760335 Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 16 Sep 2023 17:48:48 +0200 Subject: [PATCH 5/7] maint: Remove dead code from pg backend --- backends/postgres/postgres_backend.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 9e4aceb..eb789a8 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -31,9 +31,7 @@ import ( var migrationsFS embed.FS const ( - postgresBackendName = "postgres" - DefaultPgConnectionString = "postgres://postgres:postgres@127.0.0.1:5432/neoq?sslmode=disable" - PendingJobIDQuery = `SELECT id + PendingJobIDQuery = `SELECT id FROM neoq_jobs WHERE queue = $1 AND status NOT IN ('processed') From 7829f36ea7fca59806e021b67314d6bb6b3523aa Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 16 Sep 2023 19:05:31 +0200 Subject: [PATCH 6/7] fix: Fix naked return in memory backend StartCron --- backends/memory/memory_backend.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index 10d749b..6b61faa 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -181,7 +181,7 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H return fmt.Errorf("error adding cron: %w", err) } - return + return err } // SetLogger sets this backend's logger From c1139293a36e01b10f93e5d584716fca1c5a2cff Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sat, 16 Sep 2023 19:09:46 +0200 Subject: [PATCH 7/7] maint: Update README to point at Getting Started wiki --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b35f836..d07e72e 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ Background job processing for Go [![Go Reference](https://pkg.go.dev/badge/github.com/acaloiaro/neoq.svg)](https://pkg.go.dev/github.com/acaloiaro/neoq) [![Gitter chat](https://badges.gitter.im/gitterHQ/gitter.png)](https://app.gitter.im/#/room/#neoq:gitter.im) -# Usage +# Getting Started -`go get -u github.com/acaloiaro/neoq` +See the [Getting Started](https://github.com/acaloiaro/neoq/wiki/Getting-Started) wiki to get started. # About