From 3a10351049c5277df55ed67363c35bef5be8088b Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Fri, 6 Oct 2023 14:06:55 -0500 Subject: [PATCH] bug(pg): Adding test to demonstrate bug with uppercase queue names When the queue name is uppercase, the listener will never receive the notification when the job is enqueued. This is done specifically with the enqueuer and consumer being separate neoq instances (server A kicks off a job, server B is listening to perform the work). --- backends/postgres/postgres_backend.go | 1 + backends/postgres/postgres_backend_test.go | 91 ++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index f0925d4..8d2b465 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -779,6 +779,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re for { notification, waitErr := conn.Conn().WaitForNotification(ctx) + p.logger.Debug("job notification for queue", "queue", queue, "notification", notification) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index 5cc6ce3..bf8b50e 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -575,3 +575,94 @@ func Test_MoveJobsToDeadQueue(t *testing.T) { t.Error("should be dead") } } + +func TestJobEnqueuedSeparately(t *testing.T) { + connString, conn := prepareAndCleanupDB(t) + const queue = "SyncThing" + maxRetries := 5 + done := make(chan bool) + defer close(done) + + timeoutTimer := time.After(30 * time.Second) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + enqueuer, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + postgres.WithSynchronousCommit(false), + neoq.WithLogLevel(logging.LogLevelDebug), + ) + if err != nil { + t.Fatal(err) + } + defer enqueuer.Shutdown(ctx) + + consumer, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + postgres.WithSynchronousCommit(false), + neoq.WithLogLevel(logging.LogLevelDebug), + ) + if err != nil { + t.Fatal(err) + } + defer consumer.Shutdown(ctx) + h := handler.New(queue, func(_ context.Context) (err error) { + done <- true + return + }) + + go func() { + err = consumer.Start(ctx, h) + if err != nil { + t.Error(err) + } + }() + + // Wait a bit more before enqueueing + time.Sleep(10 * time.Second) + deadline := time.Now().UTC().Add(5 * time.Second) + jid, e := enqueuer.Enqueue(ctx, &jobs.Job{ + Queue: queue, + Payload: map[string]interface{}{ + "message": "hello world", + }, + Deadline: &deadline, + MaxRetries: &maxRetries, + }) + if e != nil || jid == jobs.DuplicateJobID { + t.Error(e) + } + + select { + case <-timeoutTimer: + err = jobs.ErrJobTimeout + case <-done: + } + if err != nil { + t.Error(err) + } + + // ensure job has fields set correctly + var jdl time.Time + var jmxrt int + + err = conn. + QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid). + Scan(&jdl, &jmxrt) + if err != nil { + t.Error(err) + } + + jdl = jdl.In(time.UTC) + // dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as + // strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html + if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) { + t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113 + } + + if jmxrt != maxRetries { + t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113 + } +}