From 8b21247f9bc3d8d7f390fa1d3fc4446034458a80 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Fri, 6 Oct 2023 15:42:30 -0500 Subject: [PATCH] fix(pg): Fixing notify handling name cases oddly. This makes it so that channel names for NOTIFY/LISTEN/UNLISTEN are consistent. That is that they are double quoted in all instances except for when they are referenced by `pg_notify`. This way the channel names are no longer case-insensitive. This happens because of how PostgreSQL parses the SQL statements for NOTIFY/LISTEN/UNLISTEN and treats the arguments as an identifier. Where as it treats the arguments to `pg_notify` as only a string. This causes arguments that are not double quoted to be adjusted to lower case when passed to the SQL statement. By making all usages consistent, we no longer have to worry about channel names being odd. --- backends/postgres/postgres_backend.go | 10 +++--- backends/postgres/postgres_backend_test.go | 39 ++++------------------ 2 files changed, 11 insertions(+), 38 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index 8d2b465..048e1a5 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -649,7 +649,7 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { defer func(ctx context.Context) { _ = tx.Rollback(ctx) }(ctx) // notify listeners that a job is ready to run - _, err = tx.Exec(ctx, fmt.Sprintf("NOTIFY %s, '%s'", queue, jobID)) + _, err = tx.Exec(ctx, fmt.Sprintf(`SELECT pg_notify('%s', '%s')`, queue, jobID)) if err != nil { return } @@ -761,13 +761,13 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re go func(ctx context.Context) { conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("unable to acquire new listener connnection", "queue", queue, "error", err) + p.logger.Error("unable to acquire new listener connection", "queue", queue, "error", err) return } defer p.release(ctx, conn, queue) // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected - _, err = conn.Exec(ctx, fmt.Sprintf("SET idle_in_transaction_session_timeout = '0'; LISTEN %s", queue)) + _, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue)) if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) p.logger.Error("unable to configure listener connection", "queue", queue, "error", err) @@ -779,7 +779,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re for { notification, waitErr := conn.Conn().WaitForNotification(ctx) - p.logger.Debug("job notification for queue", "queue", queue, "notification", notification) + p.logger.Debug("job notification for queue", "queue", queue, "notification", notification, "err", err) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return @@ -802,7 +802,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re } func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) { - query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %s", p.config.IdleTransactionTimeout, queue) + query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %q", p.config.IdleTransactionTimeout, queue) _, err := conn.Exec(ctx, query) if err != nil { if errors.Is(err, context.Canceled) { diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index bf8b50e..df5c8a2 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -577,13 +577,13 @@ func Test_MoveJobsToDeadQueue(t *testing.T) { } func TestJobEnqueuedSeparately(t *testing.T) { - connString, conn := prepareAndCleanupDB(t) + connString, _ := prepareAndCleanupDB(t) const queue = "SyncThing" maxRetries := 5 done := make(chan bool) defer close(done) - timeoutTimer := time.After(30 * time.Second) + timeoutTimer := time.After(5 * time.Second) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -613,22 +613,17 @@ func TestJobEnqueuedSeparately(t *testing.T) { return }) - go func() { - err = consumer.Start(ctx, h) - if err != nil { - t.Error(err) - } - }() + err = consumer.Start(ctx, h) + if err != nil { + t.Error(err) + } // Wait a bit more before enqueueing - time.Sleep(10 * time.Second) - deadline := time.Now().UTC().Add(5 * time.Second) jid, e := enqueuer.Enqueue(ctx, &jobs.Job{ Queue: queue, Payload: map[string]interface{}{ "message": "hello world", }, - Deadline: &deadline, MaxRetries: &maxRetries, }) if e != nil || jid == jobs.DuplicateJobID { @@ -643,26 +638,4 @@ func TestJobEnqueuedSeparately(t *testing.T) { if err != nil { t.Error(err) } - - // ensure job has fields set correctly - var jdl time.Time - var jmxrt int - - err = conn. - QueryRow(context.Background(), "SELECT deadline,max_retries FROM neoq_jobs WHERE id = $1", jid). - Scan(&jdl, &jmxrt) - if err != nil { - t.Error(err) - } - - jdl = jdl.In(time.UTC) - // dates from postgres come out with only 6 decimal places of millisecond precision, naively format dates as - // strings for comparison reasons. Ref https://www.postgresql.org/docs/current/datatype-datetime.html - if jdl.Format(time.RFC3339) != deadline.Format(time.RFC3339) { - t.Error(fmt.Errorf("job deadline does not match its expected value: %v != %v", jdl, deadline)) // nolint: goerr113 - } - - if jmxrt != maxRetries { - t.Error(fmt.Errorf("job MaxRetries does not match its expected value: %v != %v", jmxrt, maxRetries)) // nolint: goerr113 - } }