Skip to content

Commit

Permalink
break: Move queue name into Handler
Browse files Browse the repository at this point in the history
This goal of this change is to simplify the API. Handlers and queues
have a 1:1 mapping, so functions that receive both a `queue` and a
`handler` take two parameters when they could take only one.
  • Loading branch information
acaloiaro committed Sep 15, 2023
1 parent 18b811f commit cc476d0
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 101 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter.
```go
ctx := context.Background()
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
Expand All @@ -57,22 +57,22 @@ nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {

Enqueuing adds jobs to the specified queue to be processed asynchronously.

**Example**: Add a "Hello World" job to the `hello_world` queue using the default in-memory backend.
**Example**: Add a "Hello World" job to the `greetings` queue using the default in-memory backend.

```go
ctx := context.Background()
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Payload: map[string]interface{}{
Queue: "greetings",
Payload: map[string]any{
"message": "hello world",
},
})
```

## Redis

**Example**: Process jobs on the "hello_world" queue and add a job to it using the redis backend
**Example**: Process jobs on the "greetings" queue and add a job to it using the redis backend

```go
ctx := context.Background()
Expand All @@ -81,14 +81,14 @@ nq, _ := neoq.New(ctx,
redis.WithAddr("localhost:6379"),
redis.WithPassword(""))

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))

nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Queue: "greetings",
Payload: map[string]interface{}{
"message": "hello world",
},
Expand All @@ -97,7 +97,7 @@ nq.Enqueue(ctx, &jobs.Job{

## Postgres

**Example**: Process jobs on the "hello_world" queue and add a job to it using the postgres backend
**Example**: Process jobs on the "greetings" queue and add a job to it using the postgres backend

```go
ctx := context.Background()
Expand All @@ -106,14 +106,14 @@ nq, _ := neoq.New(ctx,
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))

nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Queue: "greetings",
Payload: map[string]interface{}{
"message": "hello world",
},
Expand Down
17 changes: 8 additions & 9 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified

return
}

Expand Down Expand Up @@ -121,22 +120,22 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
}

// Start starts processing jobs with the specified queue and handler
func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) {
queueCapacity := h.QueueCapacity
if queueCapacity == emptyCapacity {
queueCapacity = defaultMemQueueCapacity
}

m.handlers.Store(queue, h)
m.queues.Store(queue, make(chan *jobs.Job, queueCapacity))
m.handlers.Store(h.Queue, h)
m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity))

ctx, cancel := context.WithCancel(ctx)

m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()

err = m.start(ctx, queue)
err = m.start(ctx, h.Queue)
if err != nil {
return
}
Expand Down Expand Up @@ -164,8 +163,9 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()
h.Queue = queue

err = m.Start(ctx, queue, h)
err = m.Start(ctx, h)
if err != nil {
return fmt.Errorf("error processing queue '%s': %w", queue, err)
}
Expand Down Expand Up @@ -203,13 +203,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

if ht, ok = m.handlers.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
m.logger.Error("error loading handler for queue", queue)
m.logger.Error("error loading handler for queue", "queue", queue)
return
}

if qc, ok = m.queues.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue)
m.logger.Error("error loading channel for queue", queue)
m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue)
return err
}

Expand Down
22 changes: 11 additions & 11 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestBasicJobProcessing(t *testing.T) {
t.Fatal(err)
}

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -126,12 +126,12 @@ func TestBackendConfiguration(t *testing.T) {
t.Fatal(err)
}

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
time.Sleep(100 * time.Millisecond)
return
}, handler.Concurrency(1), handler.MaxQueueCapacity(1))

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -184,11 +184,11 @@ func TestFutureJobScheduling(t *testing.T) {
}
defer nq.Shutdown(ctx)

h := handler.New(func(ctx context.Context) (err error) {
h := handler.New(queue, func(ctx context.Context) (err error) {
return
})

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -232,7 +232,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
done1 := make(chan bool)
done2 := make(chan bool)

h1 := handler.New(func(ctx context.Context) (err error) {
h1 := handler.New(q1, func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
Expand All @@ -247,7 +247,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
return
})

h2 := handler.New(func(ctx context.Context) (err error) {
h2 := handler.New(q2, func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
Expand All @@ -262,11 +262,11 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
return
})

if err := nq.Start(ctx, q1, h1); err != nil {
if err := nq.Start(ctx, h1); err != nil {
t.Fatal(err)
}

if err := nq.Start(ctx, q2, h2); err != nil {
if err := nq.Start(ctx, h2); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -339,7 +339,7 @@ func TestCron(t *testing.T) {
defer nq.Shutdown(ctx)

done := make(chan bool)
h := handler.New(func(ctx context.Context) (err error) {
h := handler.New("foobar", func(ctx context.Context) (err error) {
done <- true
return
})
Expand Down
34 changes: 17 additions & 17 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,18 +305,18 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
}

// Start starts processing jobs with the specified queue and handler
func (p *PgBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
ctx, cancel := context.WithCancel(ctx)

p.logger.Debug("starting job processing", "queue", queue)
p.logger.Debug("starting job processing", "queue", h.Queue)
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.handlers[queue] = h
p.handlers[h.Queue] = h
p.mu.Unlock()

err = p.start(ctx, queue)
err = p.start(ctx, h)
if err != nil {
p.logger.Error("unable to start processing queue", "queue", queue, "error", err)
p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err)
return
}
return
Expand All @@ -339,8 +339,9 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
}

queue := internal.StripNonAlphanum(strcase.ToSnake(*cdStr))
ctx, cancel := context.WithCancel(ctx)
h.Queue = queue

ctx, cancel := context.WithCancel(ctx)
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.mu.Unlock()
Expand All @@ -358,7 +359,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
return fmt.Errorf("error adding cron: %w", err)
}

return p.Start(ctx, queue, h)
return p.Start(ctx, h)
}

// SetLogger sets this backend's logger
Expand Down Expand Up @@ -505,24 +506,23 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {

// start starts processing new, pending, and future jobs
// nolint: cyclop
func (p *PgBackend) start(ctx context.Context, queue string) (err error) {
var h handler.Handler
func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
var ok bool

if h, ok = p.handlers[queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
if h, ok = p.handlers[h.Queue]; !ok {
return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue)
}

listenJobChan, ready := p.listen(ctx, queue) // listen for 'new' jobs
listenJobChan, ready := p.listen(ctx, h.Queue) // listen for 'new' jobs
defer close(ready)

pendingJobsChan := p.pendingJobs(ctx, queue) // process overdue jobs *at startup*
pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup*

// wait for the listener to connect and be ready to listen
<-ready

// process all future jobs and retries
go func() { p.scheduleFutureJobs(ctx, queue) }()
go func() { p.scheduleFutureJobs(ctx, h.Queue) }()

for i := 0; i < h.Concurrency; i++ {
go func() {
Expand Down Expand Up @@ -714,7 +714,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
go func(ctx context.Context) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new listener connnection", err)
p.logger.Error("unable to acquire new listener connnection", "error", err)
return
}
defer p.release(ctx, conn, queue)
Expand All @@ -723,7 +723,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
_, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", err)
p.logger.Error("unable to configure listener connection", "error", err)
return
}

Expand All @@ -737,7 +737,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
return
}

p.logger.Error("failed to wait for notification", waitErr)
p.logger.Error("failed to wait for notification", "error", waitErr)
continue
}

Expand Down
Loading

0 comments on commit cc476d0

Please sign in to comment.