diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 208eb83..f0925d4 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -441,7 +441,7 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job } // moveToDeadQueue moves jobs from the pending queue to the dead queue -func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, jobErr error) (err error) { +func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, jobErr string) (err error) { _, err = tx.Exec(ctx, "DELETE FROM neoq_jobs WHERE id = $1", j.ID) if err != nil { return @@ -449,7 +449,7 @@ func (p *PgBackend) moveToDeadQueue(ctx context.Context, tx pgx.Tx, j *jobs.Job, _, err = tx.Exec(ctx, `INSERT INTO neoq_dead_jobs(id, queue, fingerprint, payload, retries, max_retries, error, deadline) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, - j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr.Error(), j.Deadline) + j.ID, j.Queue, j.Fingerprint, j.Payload, j.Retries, j.MaxRetries, jobErr, j.Deadline) return } @@ -491,7 +491,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { } if job.MaxRetries != nil && job.Retries >= *job.MaxRetries { - err = p.moveToDeadQueue(ctx, tx, job, jobErr) + err = p.moveToDeadQueue(ctx, tx, job, errMsg) return } diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 352b868..5cc6ce3 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -18,6 +18,7 @@ import ( "github.com/acaloiaro/neoq/jobs" "github.com/acaloiaro/neoq/logging" "github.com/acaloiaro/neoq/testutils" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -25,9 +26,7 @@ const ( ConcurrentWorkers = 8 ) -var ( - errPeriodicTimeout = errors.New("timed out waiting for periodic job") -) +var errPeriodicTimeout = errors.New("timed out waiting for periodic job") // prepareAndCleanupDB should be run at the beginning of each test. It will check to see if the TEST_DATABASE_URL is // present and has a valid connection string. If it does it will connect to the DB and clean up any jobs that might be @@ -497,3 +496,82 @@ results_loop: t.Error(err) } } + +// Test_MoveJobsToDeadQueue tests that when a job's MaxRetries is reached, that the job is moved ot the dead queue successfully +func Test_MoveJobsToDeadQueue(t *testing.T) { + connString, conn := prepareAndCleanupDB(t) + const queue = "testing" + maxRetries := 0 + done := make(chan bool) + defer close(done) + + timeoutTimer := time.After(5 * time.Second) + + ctx := context.Background() + nq, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + postgres.WithTransactionTimeout(60000)) + if err != nil { + t.Fatal(err) + } + defer nq.Shutdown(ctx) + + h := handler.New(queue, func(_ context.Context) (err error) { + done <- true + panic("no good") + }) + + err = nq.Start(ctx, h) + if err != nil { + t.Error(err) + } + + jid, e := nq.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + MaxRetries: &maxRetries, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + } + if err != nil { + t.Error(err) + } + + // ensure job has fields set correctly + maxWait := time.Now().Add(30 * time.Second) + var status string + for { + if time.Now().After(maxWait) { + break + } + + err = conn. + QueryRow(context.Background(), "SELECT status FROM neoq_dead_jobs WHERE id = $1", jid). + Scan(&status) + + if err == nil { + break + } + + if err != nil && errors.Is(err, pgx.ErrNoRows) { + time.Sleep(50 * time.Millisecond) + continue + } else if err != nil { + t.Error(err) + } + } + + if status != internal.JobStatusFailed { + t.Error("should be dead") + } +}