Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latest development #67

Merged
merged 7 commits into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ Background job processing for Go

[![Go Reference](https://pkg.go.dev/badge/github.com/acaloiaro/neoq.svg)](https://pkg.go.dev/github.com/acaloiaro/neoq) [![Gitter chat](https://badges.gitter.im/gitterHQ/gitter.png)](https://app.gitter.im/#/room/#neoq:gitter.im)

# Usage
# Getting Started

`go get -u github.com/acaloiaro/neoq`
See the [Getting Started](https://github.com/acaloiaro/neoq/wiki/Getting-Started) wiki to get started.

# About

Expand Down Expand Up @@ -46,7 +46,7 @@ Queue Handlers are simple Go functions that accept a `Context` parameter.
```go
ctx := context.Background()
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
Expand All @@ -57,22 +57,22 @@ nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {

Enqueuing adds jobs to the specified queue to be processed asynchronously.

**Example**: Add a "Hello World" job to the `hello_world` queue using the default in-memory backend.
**Example**: Add a "Hello World" job to the `greetings` queue using the default in-memory backend.

```go
ctx := context.Background()
nq, _ := neoq.New(ctx, neoq.WithBackend(memory.Backend))
nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Payload: map[string]interface{}{
Queue: "greetings",
Payload: map[string]any{
"message": "hello world",
},
})
```

## Redis

**Example**: Process jobs on the "hello_world" queue and add a job to it using the redis backend
**Example**: Process jobs on the "greetings" queue and add a job to it using the redis backend

```go
ctx := context.Background()
Expand All @@ -81,14 +81,14 @@ nq, _ := neoq.New(ctx,
redis.WithAddr("localhost:6379"),
redis.WithPassword(""))

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))

nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Queue: "greetings",
Payload: map[string]interface{}{
"message": "hello world",
},
Expand All @@ -97,7 +97,7 @@ nq.Enqueue(ctx, &jobs.Job{

## Postgres

**Example**: Process jobs on the "hello_world" queue and add a job to it using the postgres backend
**Example**: Process jobs on the "greetings" queue and add a job to it using the postgres backend

```go
ctx := context.Background()
Expand All @@ -106,14 +106,14 @@ nq, _ := neoq.New(ctx,
postgres.WithConnectionString("postgres://postgres:[email protected]:5432/neoq"),
)

nq.Start(ctx, "hello_world", handler.New(func(ctx context.Context) (err error) {
nq.Start(ctx, handler.New("greetings", func(ctx context.Context) (err error) {
j, _ := jobs.FromContext(ctx)
log.Println("got job id:", j.ID, "messsage:", j.Payload["message"])
return
}))

nq.Enqueue(ctx, &jobs.Job{
Queue: "hello_world",
Queue: "greetings",
Payload: map[string]interface{}{
"message": "hello world",
},
Expand Down
27 changes: 15 additions & 12 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
var qc any
var ok bool

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified
return
}

m.logger.Debug("adding a new job", "queue", job.Queue)

if qc, ok = m.queues.Load(job.Queue); !ok {
Expand All @@ -89,7 +94,6 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,

if job.Queue == "" {
err = jobs.ErrNoQueueSpecified

return
}

Expand Down Expand Up @@ -121,22 +125,22 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
}

// Start starts processing jobs with the specified queue and handler
func (m *MemBackend) Start(ctx context.Context, queue string, h handler.Handler) (err error) {
func (m *MemBackend) Start(ctx context.Context, h handler.Handler) (err error) {
queueCapacity := h.QueueCapacity
if queueCapacity == emptyCapacity {
queueCapacity = defaultMemQueueCapacity
}

m.handlers.Store(queue, h)
m.queues.Store(queue, make(chan *jobs.Job, queueCapacity))
m.handlers.Store(h.Queue, h)
m.queues.Store(h.Queue, make(chan *jobs.Job, queueCapacity))

ctx, cancel := context.WithCancel(ctx)

m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()

err = m.start(ctx, queue)
err = m.start(ctx, h.Queue)
if err != nil {
return
}
Expand Down Expand Up @@ -164,8 +168,9 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
m.mu.Lock()
m.cancelFuncs = append(m.cancelFuncs, cancel)
m.mu.Unlock()
h.Queue = queue

err = m.Start(ctx, queue, h)
err = m.Start(ctx, h)
if err != nil {
return fmt.Errorf("error processing queue '%s': %w", queue, err)
}
Expand All @@ -176,7 +181,7 @@ func (m *MemBackend) StartCron(ctx context.Context, cronSpec string, h handler.H
return fmt.Errorf("error adding cron: %w", err)
}

return
return err
}

// SetLogger sets this backend's logger
Expand All @@ -203,13 +208,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

if ht, ok = m.handlers.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
m.logger.Error("error loading handler for queue", queue)
m.logger.Error("error loading handler for queue", "queue", queue)
return
}

if qc, ok = m.queues.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, queue)
m.logger.Error("error loading channel for queue", queue)
m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue)
return err
}

Expand All @@ -222,13 +226,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
go func() {
var err error
var job *jobs.Job

for {
select {
case job = <-queueChan:
err = m.handleJob(ctx, job, h)
case <-ctx.Done():
return
err = ctx.Err()
}

if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions backends/memory/memory_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ func TestBasicJobProcessing(t *testing.T) {
t.Fatal(err)
}

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
done <- true
return
})

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -126,12 +126,12 @@ func TestBackendConfiguration(t *testing.T) {
t.Fatal(err)
}

h := handler.New(func(_ context.Context) (err error) {
h := handler.New(queue, func(_ context.Context) (err error) {
time.Sleep(100 * time.Millisecond)
return
}, handler.Concurrency(1), handler.MaxQueueCapacity(1))

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -184,11 +184,11 @@ func TestFutureJobScheduling(t *testing.T) {
}
defer nq.Shutdown(ctx)

h := handler.New(func(ctx context.Context) (err error) {
h := handler.New(queue, func(ctx context.Context) (err error) {
return
})

if err := nq.Start(ctx, queue, h); err != nil {
if err := nq.Start(ctx, h); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -232,7 +232,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
done1 := make(chan bool)
done2 := make(chan bool)

h1 := handler.New(func(ctx context.Context) (err error) {
h1 := handler.New(q1, func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
Expand All @@ -247,7 +247,7 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
return
})

h2 := handler.New(func(ctx context.Context) (err error) {
h2 := handler.New(q2, func(ctx context.Context) (err error) {
var j *jobs.Job
j, err = jobs.FromContext(ctx)
if err != nil {
Expand All @@ -262,11 +262,11 @@ func TestFutureJobSchedulingMultipleQueues(t *testing.T) {
return
})

if err := nq.Start(ctx, q1, h1); err != nil {
if err := nq.Start(ctx, h1); err != nil {
t.Fatal(err)
}

if err := nq.Start(ctx, q2, h2); err != nil {
if err := nq.Start(ctx, h2); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -339,7 +339,7 @@ func TestCron(t *testing.T) {
defer nq.Shutdown(ctx)

done := make(chan bool)
h := handler.New(func(ctx context.Context) (err error) {
h := handler.New("foobar", func(ctx context.Context) (err error) {
done <- true
return
})
Expand Down
Loading