From 86f7869440ffada0b1a2d3a13aa6bbfe5a3478af Mon Sep 17 00:00:00 2001 From: Adriano Caloiaro Date: Sun, 3 Dec 2023 17:30:22 -0700 Subject: [PATCH] feat: Multiplex the listener connection --- backends/postgres/postgres_backend.go | 255 ++++++++++++++------- backends/postgres/postgres_backend_test.go | 81 ++----- 2 files changed, 187 insertions(+), 149 deletions(-) diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e7cc225..ee97491 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -24,6 +24,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/jsuar/go-cron-descriptor/pkg/crondescriptor" "github.com/robfig/cron" + "golang.org/x/exp/slices" "golang.org/x/exp/slog" ) @@ -53,6 +54,7 @@ const ( LIMIT 100 FOR UPDATE SKIP LOCKED` setIdleInTxSessionTimeout = `SET idle_in_transaction_session_timeout = 0` + pgConnectionBusyRetries = 10 // the number of times to retry busy postgres connections, i.e. PgConn.IsBusy() ) type contextKey struct{} @@ -72,14 +74,19 @@ var ( // PgBackend is a Postgres-based Neoq backend type PgBackend struct { neoq.Neoq - config *neoq.Config - logger logging.Logger - cron *cron.Cron - pool *pgxpool.Pool - mu *sync.RWMutex // mutex to protect mutating state on a pgWorker - futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record - handlers map[string]handler.Handler // a map of queue names to queue handlers - cancelFuncs []context.CancelFunc // A collection of cancel functions to be called upon Shutdown() + cancelFuncs []context.CancelFunc // cancel functions to be called upon Shutdown() + config *neoq.Config // backend configuration + cron *cron.Cron // scheduler for periodic jobs + futureJobs map[string]*jobs.Job // map of future job IDs to the corresponding job record + handlers map[string]handler.Handler // a map of queue names to queue handlers + newQueues chan string // a channel that indicates that new queues are ready to be processed + readyQueues chan string // a channel that indicates which queues are ready to have jobs processed. + listenCancelCh chan context.CancelFunc // cancellation channel for the listenerConn's WaitForNotification call. + listenerConn *pgx.Conn // dedicated connection that LISTENs for jobs across all queues + listenerConnMu *sync.RWMutex // listenerConnMu protects the listener connection from concurrent access + logger logging.Logger // backend-wide logger + mu *sync.RWMutex // protects concurrent access to fields on PgBackend + pool *pgxpool.Pool // connection pool for backend, used to process and enqueue jobs } // Backend initializes a new postgres-backed neoq backend @@ -112,12 +119,16 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err cfg.PGConnectionTimeout = DefaultConnectionTimeout p := &PgBackend{ - mu: &sync.RWMutex{}, - config: cfg, - handlers: make(map[string]handler.Handler), - futureJobs: make(map[string]*jobs.Job), - cron: cron.New(), - cancelFuncs: []context.CancelFunc{}, + cancelFuncs: []context.CancelFunc{}, + config: cfg, + cron: cron.New(), + futureJobs: make(map[string]*jobs.Job), + handlers: make(map[string]handler.Handler), + newQueues: make(chan string), + readyQueues: make(chan string), + listenerConnMu: &sync.RWMutex{}, + mu: &sync.RWMutex{}, + listenCancelCh: make(chan context.CancelFunc, 1), } // Set all options @@ -166,6 +177,14 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err } } + p.listenerConn, err = p.newListenerConn(ctx) + if err != nil { + p.logger.Error("unable to initialize listener connection", slog.Any("error", err)) + } + + // monitor handlers for changes and LISTEN when new queues are added + go p.newQueueMonitor(ctx) + p.cron.Start() pb = p @@ -173,6 +192,68 @@ func Backend(ctx context.Context, opts ...neoq.ConfigOption) (pb neoq.Neoq, err return pb, nil } +// newQueueMonitor monitors for new queues and instruct's the listener connection to LISTEN for jobs on them +func (p *PgBackend) newQueueMonitor(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case newQueue := <-p.newQueues: + p.logger.Debug("configure new handler", "queue", newQueue) + setup_listeners: + // drain p.listenCancelCh before setting up new listeners + select { + case cancelListener := <-p.listenCancelCh: + p.logger.Debug("canceling previous wait listeners", "queue", newQueue) + cancelListener() + goto setup_listeners + default: + } + + p.listenerConnMu.Lock() + // note: 'LISTEN, channel' is idempotent + _, err := p.listenerConn.Exec(ctx, fmt.Sprintf(`LISTEN %q`, newQueue)) + p.listenerConnMu.Unlock() + if err != nil { + err = fmt.Errorf("unable to configure listener connection: %w", err) + p.logger.Error("FATAL ERROR unable to listen for new jobs", slog.String("queue", newQueue), slog.Any("error", err)) + return + } + + p.logger.Debug("listening on queue", "queue", newQueue) + p.readyQueues <- newQueue + } + } +} + +func (p *PgBackend) newListenerConn(ctx context.Context) (conn *pgx.Conn, err error) { + var pgxCfg *pgx.ConnConfig + pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString) + if err != nil { + return + } + + // remove any pgxpool parameters before creating a new connection + customPgxParams := []string{ + "pool_max_conns", "pool_min_conns", + "pool_max_conn_lifetime", "pool_max_conn_idle_time", "pool_health_check_period", + "pool_max_conn_lifetime_jitter", + } + for param := range pgxCfg.RuntimeParams { + if slices.Contains(customPgxParams, param) { + delete(pgxCfg.RuntimeParams, param) + } + } + conn, err = pgx.ConnectConfig(ctx, pgxCfg) + if err != nil { + p.logger.Error("unable to acquire listener connection", slog.Any("error", err)) + return + } + _, err = conn.Exec(ctx, "SET idle_in_transaction_session_timeout = 0") + + return +} + // WithConnectionString configures neoq postgres backend to use the specified connection string when connecting to a backend func WithConnectionString(connectionString string) neoq.ConfigOption { return func(c *neoq.Config) { @@ -357,6 +438,8 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { p.handlers[h.Queue] = h p.mu.Unlock() + p.newQueues <- h.Queue + err = p.start(ctx, h) if err != nil { p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err)) @@ -549,22 +632,25 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { // nolint: cyclop func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { var ok bool + var listenJobChan chan *pgconn.Notification + var errCh chan error if h, ok = p.handlers[h.Queue]; !ok { return fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, h.Queue) } - listenJobChan, ready, errCh := p.listen(ctx, h.Queue) // listen for 'new' jobs - defer close(ready) - pendingJobsChan := p.pendingJobs(ctx, h.Queue) // process overdue jobs *at startup* // wait for the listener to connect and be ready to listen - select { - case <-ready: - break - case err = <-errCh: - return + for q := range p.readyQueues { + if q == h.Queue { + listenJobChan, errCh = p.listen(ctx) + break + } + + p.logger.Debug("Picked up a queue that a different start() will be waiting for. Adding back to ready list", + slog.String("queue", q)) + p.readyQueues <- q } // process all future jobs and retries @@ -573,16 +659,19 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { for i := 0; i < h.Concurrency; i++ { go func() { var err error - var jobID string + var n *pgconn.Notification for { select { - case jobID = <-listenJobChan: - err = p.handleJob(ctx, jobID, h) - case jobID = <-pendingJobsChan: - err = p.handleJob(ctx, jobID, h) + case n = <-listenJobChan: + err = p.handleJob(ctx, n.Payload) + case n = <-pendingJobsChan: + err = p.handleJob(ctx, n.Payload) case <-ctx.Done(): return + case <-errCh: + p.logger.Error("error hanlding job", "error", err) + continue } if err != nil { @@ -595,7 +684,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { "job failed", slog.String("queue", h.Queue), slog.Any("error", err), - slog.String("job_id", jobID), + slog.String("job_id", n.Payload), ) continue @@ -699,8 +788,8 @@ func (p *PgBackend) announceJob(ctx context.Context, queue, jobID string) { } } -func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan string) { - jobsCh = make(chan string) +func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan *pgconn.Notification) { + jobsCh = make(chan *pgconn.Notification) conn, err := p.acquire(ctx) if err != nil { @@ -729,7 +818,7 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan slog.String("job_id", jobID), ) } else { - jobsCh <- jobID + jobsCh <- &pgconn.Notification{Channel: queue, Payload: jobID} } } }(ctx) @@ -741,7 +830,8 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan // it receives pending, periodic, and retry job ids asynchronously // 1. handleJob first creates a transactions inside of which a row lock is acquired for the job to be processed. // 2. handleJob secondly calls the handler on the job, and finally updates the job's status -func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handler) (err error) { +// nolint: cyclop +func (p *PgBackend) handleJob(ctx context.Context, jobID string) (err error) { var job *jobs.Job var tx pgx.Tx conn, err := p.acquire(ctx) @@ -763,7 +853,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline - p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", h.Queue), slog.Int64("job_id", job.ID)) + p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", job.Queue), slog.Int64("job_id", job.ID)) err = p.updateJob(ctx, err) return } @@ -776,8 +866,17 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl job.Retries++ } + var jobErr error + h, ok := p.handlers[job.Queue] + if !ok { + p.logger.Error("received a job for which no handler is configured", + slog.String("queue", job.Queue), + slog.Int64("job_id", job.ID)) + return handler.ErrNoHandlerForQueue + } + // execute the queue handler of this job - jobErr := handler.Exec(ctx, h) + jobErr = handler.Exec(ctx, h) err = p.updateJob(ctx, jobErr) if err != nil { if errors.Is(err, context.Canceled) { @@ -802,77 +901,63 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl // TODO: There is currently no handling of listener disconnects in PgBackend. // This will lead to jobs not getting processed until the worker is restarted. // Implement disconnect handling. -func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, ready chan bool, errCh chan error) { - c = make(chan string, p.handlers[queue].Concurrency) - ready = make(chan bool) +func (p *PgBackend) listen(ctx context.Context) (c chan *pgconn.Notification, errCh chan error) { + c = make(chan *pgconn.Notification) errCh = make(chan error) - go func(ctx context.Context) { - conn, err := p.acquire(ctx) - if err != nil { - p.logger.Error("unable to acquire new listener connection", slog.String("queue", queue), slog.Any("error", err)) - errCh <- err - return - } - defer p.release(ctx, conn, queue) - - // set this connection's idle in transaction timeout to infinite so it is not intermittently disconnected - _, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue)) - if err != nil { - err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err)) - errCh <- err - return - } - - // notify start() that we're ready to listen for jobs - ready <- true + waitForNotificationCtx, cancel := context.WithCancel(ctx) + p.listenCancelCh <- cancel + go func(ctx context.Context) { + var notification *pgconn.Notification + var waitErr error for { - notification, waitErr := conn.Conn().WaitForNotification(ctx) - p.logger.Debug( - "job notification for queue", - slog.String("queue", queue), - slog.Any("notification", notification), - slog.Any("err", err), - ) + select { + case <-ctx.Done(): + // our context has been canceled, the system is shutting down + return + default: + p.listenerConnMu.Lock() + notification, waitErr = p.listenerConn.WaitForNotification(waitForNotificationCtx) + p.listenerConnMu.Unlock() + } if waitErr != nil { if errors.Is(waitErr, context.Canceled) { + // this is likely not a system shutdown, but an interrupt from the goroutine that manages changes to + // the list of handlers. It needs the connection to be unbusy so that it can instruct the connection + // to start listening on any new queues + p.logger.Debug("Stopping notifications processing") return } - p.logger.Error("failed to wait for notification", slog.String("queue", queue), slog.Any("error", waitErr)) + // The connection is busy adding new LISTENers + if p.listenerConn.PgConn().IsBusy() { + p.logger.Debug("listen connection is busy, trying to acquire listener connection again...") + waitForNotificationCtx, cancel = context.WithCancel(ctx) + p.listenCancelCh <- cancel + continue + } + + p.logger.Error("failed to wait for notification", slog.Any("error", waitErr)) continue } + p.logger.Debug( + "job notification for queue", + slog.Any("notification", notification), + slog.Any("err", waitErr), + ) + // check if Shutdown() has been called if notification.Payload == shutdownJobID { return } - c <- notification.Payload + c <- notification } }(ctx) - return c, ready, errCh -} - -func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue string) { - query := fmt.Sprintf("SET idle_in_transaction_session_timeout = '%d'; UNLISTEN %q", p.config.IdleTransactionTimeout, queue) - _, err := conn.Exec(ctx, query) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - - p.logger.Error( - "unable to reset connection config before release", - slog.String("queue", queue), - slog.Any("error", err), - ) - } - - conn.Release() + return c, errCh } func (p *PgBackend) getJob(ctx context.Context, tx pgx.Tx, jobID string) (job *jobs.Job, err error) { diff --git a/backends/postgres/postgres_backend_test.go b/backends/postgres/postgres_backend_test.go index e2dc4c0..c41aef4 100644 --- a/backends/postgres/postgres_backend_test.go +++ b/backends/postgres/postgres_backend_test.go @@ -4,9 +4,7 @@ import ( "context" "errors" "fmt" - "log" "os" - "regexp" "strings" "sync" "sync/atomic" @@ -155,11 +153,19 @@ func TestBasicJobProcessing(t *testing.T) { func TestMultipleProcessors(t *testing.T) { const queue = "testing" + var execCount uint32 + var wg sync.WaitGroup connString, _ := prepareAndCleanupDB(t) - var execCount uint32 - var wg sync.WaitGroup + h := handler.New(queue, func(_ context.Context) (err error) { + atomic.AddUint32(&execCount, 1) + wg.Done() + return + }) + // Make sure that each neoq worker only works on one thing at a time. + h.Concurrency = 1 + neos := make([]neoq.Neoq, 0, ConcurrentWorkers) // Create several neoq processors such that we can enqueue several jobs and have them consumed by multiple different // workers. We want to make sure that a job is not processed twice in a pool of many different neoq workers. @@ -173,17 +179,6 @@ func TestMultipleProcessors(t *testing.T) { nq.Shutdown(ctx) }) - h := handler.New(queue, func(_ context.Context) (err error) { - // Make sure that by wasting some time working on a thing we don't consume two jobs back to back. - // This should give the other neoq clients enough time to grab a job as well. - time.Sleep(500 * time.Millisecond) - atomic.AddUint32(&execCount, 1) - wg.Done() - return - }) - // Make sure that each neoq worker only works on one thing at a time. - h.Concurrency = 1 - err = nq.Start(ctx, h) if err != nil { t.Error(err) @@ -271,7 +266,12 @@ func TestBasicJobMultipleQueue(t *testing.T) { connString, _ := prepareAndCleanupDB(t) ctx := context.TODO() - nq, err := neoq.New(ctx, neoq.WithBackend(postgres.Backend), postgres.WithConnectionString(connString)) + nq, err := neoq.New(ctx, + neoq.WithBackend(postgres.Backend), + postgres.WithConnectionString(connString), + neoq.WithLogLevel(logging.LogLevelDebug), + postgres.WithConnectionTimeout(1*time.Second), + ) if err != nil { t.Fatal(err) } @@ -759,13 +759,8 @@ func Test_ConnectionTimeout(t *testing.T) { t.Fatal(err) } - h := handler.New(queue, func(_ context.Context) (err error) { - done <- true - return - }) - go func() { - err = nq.Start(ctx, h) + _, err = nq.Enqueue(ctx, &jobs.Job{Queue: queue}) done <- true }() @@ -779,46 +774,4 @@ func Test_ConnectionTimeout(t *testing.T) { if !errors.Is(err, postgres.ErrExceededConnectionPoolTimeout) { t.Error(err) } - - // Create an instance with a non-zero timeout, but only give allow a pool size of 1 - // this will trquire a failure to acquire connections when the number of Start() calls exceeds 1 - nq, err = neoq.New(ctx, - neoq.WithBackend(postgres.Backend), - postgres.WithConnectionString(maxConnsDBUrl(1)), - postgres.WithConnectionTimeout(100*time.Millisecond)) - if err != nil { - t.Fatal(err) - return - } - - go func() { - err = nq.Start(ctx, h) - if err != nil { - return - } - - err = nq.Start(ctx, h) - done <- true - }() - - timeoutTimer = time.After(5 * time.Second) - select { - case <-timeoutTimer: - err = jobs.ErrJobTimeout - case <-done: - } - - log.Println("The error is", err) - if !errors.Is(err, postgres.ErrExceededConnectionPoolTimeout) { - t.Error(err) - } -} - -func maxConnsDBUrl(maxConns int) (dbURL string) { - dbURL = os.Getenv("TEST_DATABASE_URL") - r := regexp.MustCompile(`pool_max_conns=\d+`) - dbURL = string(r.ReplaceAll([]byte(dbURL), []byte(fmt.Sprintf("pool_max_conns=%d", maxConns)))) - - log.Println("URL", dbURL) - return }