diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index f0925d4..8d2b465 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -779,6 +779,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re for { notification, waitErr := conn.Conn().WaitForNotification(ctx) + p.logger.Debug("job notification for queue", "queue", queue, "notification", notification) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 5cc6ce3..bf8b50e 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -575,3 +575,94 @@ func Test_MoveJobsToDeadQueue(t *testing.T) { t.Error("should be dead") } } + +func TestJobEnqueuedSeparately(t *testing.T) { + connString, conn := prepareAndCleanupDB(t) + const queue = "SyncThing" + maxRetries := 5 + done := make(chan bool) + defer close(done) + + timeoutTimer := time.After(30 * time.Second) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + enqueuer, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + postgres.WithSynchronousCommit(false), + neoq.WithLogLevel(logging.LogLevelDebug), + ) + if err != nil { + t.Fatal(err) + } + defer enqueuer.Shutdown(ctx) + + consumer, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + postgres.WithSynchronousCommit(false), + neoq.WithLogLevel(logging.LogLevelDebug), + ) + if err != nil { + t.Fatal(err) + } + defer consumer.Shutdown(ctx) + h := handler.New(queue, func(_ context.Context) (err error) { + done <- true + return + }) + + go func() { + err = consumer.Start(ctx, h) + if err != nil { + t.Error(err) + } + }() + + // Wait a bit more before enqueueing + time.Sleep(10 * time.Second) + deadline := time.Now().UTC().Add(5 * time.Second) + jid, e := enqueuer.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + Deadline: &deadline, + MaxRetries: &maxRetries, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + } + if err != nil { + t.Error(err) + } + + // ensure job has fields set correctly + var jdl time.Time + var jmxrt int + + err = conn. + QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid). + Scan(&jdl, &jmxrt) + if err != nil { + t.Error(err) + } + + jdl = jdl.In(time.UTC) + // dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as + // strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html + if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) { + t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113 + } + + if jmxrt != maxRetries { + t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113 + } +}