From 82d82a5991caef2fa3bed31101d8a047bb4984e4 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Sun, 5 Nov 2023 18:57:01 -0600 Subject: [PATCH 1/3] chore(log): Tweaking logging again, making everything consistent This is another logging change to make everything consistent. Log key-value pairs are now done via slog's Attr format based on the notes at the tail end here: https://go.dev/blog/slog While this might be a more opinionated change, I see one major benefit. If the user of the library is wrapping the logging with their own logger that they are using (zap, logrus or any else) they would have to parse both the alternating key value pairs and the Attr structs. By only using the Attr structs the user would only need to handle that one object type now in their own logging layer; hopefully making integration easier for them to have consistent logs. --- backends/memory/memory_backend.go | 38 ++++++++--- backends/postgres/postgres_backend.go | 96 +++++++++++++++++++-------- backends/redis/redis_backend.go | 12 ++-- 3 files changed, 102 insertions(+), 44 deletions(-) diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index bf1f64a..b0ec080 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -81,7 +81,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, job.Status = internal.JobStatusNew } - m.logger.Debug("adding a new job", "queue", job.Queue) + m.logger.Debug("adding a new job", slog.String("queue", job.Queue)) if qc, ok = m.queues.Load(job.Queue); !ok { return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue) @@ -213,12 +213,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { if ht, ok = m.handlers.Load(queue); !ok { err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) - m.logger.Error("error loading handler for queue", "queue", queue) + m.logger.Error("error loading handler for queue", slog.String("queue", queue)) return } if qc, ok = m.queues.Load(queue); !ok { - m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue) + m.logger.Error("error loading channel for queue", slog.String("queue", queue), slog.Any("error", handler.ErrNoHandlerForQueue)) return err } @@ -247,7 +247,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { return } - m.logger.Error("job failed", "job_id", job.ID, "error", err) + m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err)) runAfter := internal.CalculateBackoff(job.Retries) job.RunAfter = runAfter @@ -286,11 +286,17 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) { timeUntilRunAfter := time.Until(job.RunAfter) if timeUntilRunAfter <= m.config.FutureJobWindow { m.removeFutureJob(job.ID) - m.logger.Debug("dequeued job", "queue", job.Queue, "retries", job.Retries, "next_run", timeUntilRunAfter, "job_id", job.ID) + m.logger.Debug( + "dequeued job", + slog.String("queue", job.Queue), + slog.Int("retries", job.Retries), + slog.String("next_run", timeUntilRunAfter.String()), + slog.Int64("job_id", job.ID), + ) go func(j *jobs.Job) { scheduleCh := time.After(timeUntilRunAfter) <-scheduleCh - m.logger.Debug("loading job for queue", "queue", j.Queue) + m.logger.Debug("loading job for queue", slog.String("queue", j.Queue)) if qc, ok := m.queues.Load(j.Queue); ok { queueChan = qc.(chan *jobs.Job) queueChan <- j @@ -314,14 +320,23 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) { func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) { ctx = withJobContext(ctx, job) - m.logger.Debug("handling job", "status", job.Status, "retries", job.Retries, "job_id", job.ID) + m.logger.Debug( + "handling job", + slog.String("status", job.Status), + slog.Int("retries", job.Retries), + slog.Int64("job_id", job.ID), + ) if job.Status != internal.JobStatusNew { job.Retries++ } if job.Deadline != nil && job.Deadline.UTC().Before(time.Now().UTC()) { - m.logger.Debug("job deadline is in the past, skipping", "deadline", job.Deadline, "job_id", job.ID) + m.logger.Debug( + "job deadline is in the past, skipping", + slog.Time("deadline", *job.Deadline), + slog.Int64("job_id", job.ID), + ) err = jobs.ErrJobExceededDeadline return } @@ -337,7 +352,12 @@ func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Han } if job.MaxRetries != nil && job.Retries >= *job.MaxRetries { - m.logger.Debug("job exceeded max retries", "retries", job.Retries, "max_retries", *job.MaxRetries, "job_id", job.ID) + m.logger.Debug( + "job exceeded max retries", + slog.Int("retries", job.Retries), + slog.Int("max_retries", *job.MaxRetries), + slog.Int64("job_id", job.ID), + ) err = jobs.ErrJobExceededMaxRetries } diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e476ff8..4ee71be 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -227,7 +227,7 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) { func (p *PgBackend) initializeDB() (err error) { migrations, err := iofs.New(migrationsFS, "migrations") if err != nil { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } @@ -238,7 +238,7 @@ func (p *PgBackend) initializeDB() (err error) { var pgxCfg *pgx.ConnConfig pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString) if err != nil { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } @@ -256,7 +256,7 @@ func (p *PgBackend) initializeDB() (err error) { sslMode) m, err := migrate.NewWithSourceInstance("iofs", migrations, pqConnectionString) if err != nil { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } // We don't need the migration tooling to hold it's connections to the DB once it has been completed. @@ -264,7 +264,7 @@ func (p *PgBackend) initializeDB() (err error) { err = m.Up() if err != nil && !errors.Is(err, migrate.ErrNoChange) { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } @@ -278,9 +278,9 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e return } - p.logger.Debug("enqueueing job payload", "queue", job.Queue, slog.Any("job_payload", job.Payload)) + p.logger.Debug("enqueueing job payload", slog.String("queue", job.Queue), slog.Any("job_payload", job.Payload)) - p.logger.Debug("acquiring new connection from connection pool", "queue", job.Queue) + p.logger.Debug("acquiring new connection from connection pool", slog.String("queue", job.Queue)) conn, err := p.pool.Acquire(ctx) if err != nil { err = fmt.Errorf("error acquiring connection: %w", err) @@ -288,7 +288,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } defer conn.Release() - p.logger.Debug("beginning new transaction to enqueue job", "queue", job.Queue) + p.logger.Debug("beginning new transaction to enqueue job", slog.String("queue", job.Queue)) tx, err := conn.Begin(ctx) if err != nil { err = fmt.Errorf("error creating transaction: %w", err) @@ -307,7 +307,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e return } } - p.logger.Error("error enqueueing job", "queue", job.Queue, "error", err) + p.logger.Error("error enqueueing job", slog.String("queue", job.Queue), slog.Any("error", err)) err = fmt.Errorf("error enqueuing job: %w", err) } @@ -316,14 +316,19 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e err = fmt.Errorf("error committing transaction: %w", err) return } - p.logger.Debug("job added to queue:", "queue", job.Queue, "job_id", jobID) + p.logger.Debug("job added to queue:", slog.String("queue", job.Queue), slog.String("job_id", jobID)) // add future jobs to the future job list if job.RunAfter.After(time.Now().UTC()) { p.mu.Lock() p.futureJobs[jobID] = job p.mu.Unlock() - p.logger.Debug("added job to future jobs list", "queue", job.Queue, "job_id", jobID, "run_after", job.RunAfter) + p.logger.Debug( + "added job to future jobs list", + slog.String("queue", job.Queue), + slog.String("job_id", jobID), + slog.Time("run_after", job.RunAfter), + ) } return jobID, nil @@ -333,7 +338,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { ctx, cancel := context.WithCancel(ctx) - p.logger.Debug("starting job processing", "queue", h.Queue) + p.logger.Debug("starting job processing", slog.String("queue", h.Queue)) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) p.handlers[h.Queue] = h @@ -341,7 +346,7 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { err = p.start(ctx, h) if err != nil { - p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err) + p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err)) return } return @@ -353,13 +358,23 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) { cd, err := crondescriptor.NewCronDescriptor(cronSpec) if err != nil { - p.logger.Error("error creating cron descriptor", "queue", h.Queue, "cronspec", cronSpec, "error", err) + p.logger.Error( + "error creating cron descriptor", + slog.String("queue", h.Queue), + slog.String("cronspec", cronSpec), + slog.Any("error", err), + ) return fmt.Errorf("error creating cron descriptor: %w", err) } cdStr, err := cd.GetDescription(crondescriptor.Full) if err != nil { - p.logger.Error("error getting cron descriptor", "queue", h.Queue, "descriptor", crondescriptor.Full, "error", err) + p.logger.Error( + "error getting cron descriptor", + slog.String("queue", h.Queue), + slog.Any("descriptor", crondescriptor.Full), + slog.Any("error", err), + ) return fmt.Errorf("error getting cron description: %w", err) } @@ -382,7 +397,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha return } - p.logger.Error("error queueing cron job", "queue", h.Queue, "error", err) + p.logger.Error("error queueing cron job", slog.String("queue", h.Queue), slog.Any("error", err)) } }); err != nil { return fmt.Errorf("error adding cron: %w", err) @@ -398,7 +413,7 @@ func (p *PgBackend) SetLogger(logger logging.Logger) { // Shutdown shuts this backend down func (p *PgBackend) Shutdown(ctx context.Context) { - p.logger.Debug("starting shutdown.") + p.logger.Debug("starting shutdown") for queue := range p.handlers { p.announceJob(ctx, queue, shutdownJobID) } @@ -427,7 +442,7 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job return } - p.logger.Debug("adding job to the queue", "queue", j.Queue) + p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue)) err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`, j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID) @@ -474,7 +489,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { errMsg := "" if jobErr != nil { - p.logger.Error("job failed", "job_error", jobErr) + p.logger.Error("job failed", slog.Any("job_error", jobErr)) status = internal.JobStatusFailed errMsg = jobErr.Error() } @@ -558,7 +573,12 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { continue } - p.logger.Error("job failed", "queue", h.Queue, "error", err, "job_id", jobID) + p.logger.Error( + "job failed", + slog.String("queue", h.Queue), + slog.Any("error", err), + slog.String("job_id", jobID), + ) continue } @@ -574,7 +594,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) { rows, err := p.pool.Query(ctx, FutureJobQuery, queue) if err != nil { - p.logger.Error("failed to fetch future jobs list", "queue", queue, "error", err) + p.logger.Error("failed to fetch future jobs list", slog.String("queue", queue), slog.Any("error", err)) return } @@ -666,7 +686,11 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("failed to acquire database connection to listen for pending queue items", "queue", queue, "error", err) + p.logger.Error( + "failed to acquire database connection to listen for pending queue items", + slog.String("queue", queue), + slog.Any("error", err), + ) return } @@ -680,7 +704,12 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan break } - p.logger.Error("failed to fetch pending job", "queue", queue, "error", err, "job_id", jobID) + p.logger.Error( + "failed to fetch pending job", + slog.String("queue", queue), + slog.Any("error", err), + slog.String("job_id", jobID), + ) } else { jobsCh <- jobID } @@ -716,7 +745,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline - p.logger.Debug("job deadline is in the past, skipping", "queue", h.Queue, "job_id", job.ID) + p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", h.Queue), slog.Int64("job_id", job.ID)) err = p.updateJob(ctx, err) return } @@ -744,7 +773,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl err = tx.Commit(ctx) if err != nil { errMsg := "unable to commit job transaction. retrying this job may dupliate work:" - p.logger.Error(errMsg, "queue", h.Queue, "error", err, "job_id", job.ID) + p.logger.Error(errMsg, slog.String("queue", h.Queue), slog.Any("error", err), slog.Int64("job_id", job.ID)) return fmt.Errorf("%s %w", errMsg, err) } @@ -762,7 +791,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re go func(ctx context.Context) { conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("unable to acquire new listener connection", "queue", queue, "error", err) + p.logger.Error("unable to acquire new listener connection", slog.String("queue", queue), slog.Any("error", err)) return } defer p.release(ctx, conn, queue) @@ -771,7 +800,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re _, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue)) if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", "queue", queue, "error", err) + p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err)) return } @@ -780,13 +809,18 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re for { notification, waitErr := conn.Conn().WaitForNotification(ctx) - p.logger.Debug("job notification for queue", "queue", queue, "notification", notification, "err", err) + p.logger.Debug( + "job notification for queue", + slog.String("queue", queue), + slog.Any("notification", notification), + slog.Any("err", err), + ) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return } - p.logger.Error("failed to wait for notification", "queue", queue, "error", waitErr) + p.logger.Error("failed to wait for notification", slog.String("queue", queue), slog.Any("error", waitErr)) continue } @@ -810,7 +844,11 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin return } - p.logger.Error("unable to reset connection config before release", "queue", queue, "error", err) + p.logger.Error( + "unable to reset connection config before release", + slog.String("queue", queue), + slog.Any("error", err), + ) } conn.Release() diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index e413ec3..6d20f91 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -120,7 +120,7 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e SchedulerOpts: &asynq.SchedulerOpts{ PostEnqueueFunc: func(_ *asynq.TaskInfo, err error) { if err != nil { - b.logger.Error("unable to schedule task", err) + b.logger.Error("unable to schedule task", slog.Any("error", err)) } }, }, @@ -210,24 +210,24 @@ func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { taskID := t.ResultWriter().TaskID() var p map[string]any if err = json.Unmarshal(t.Payload(), &p); err != nil { - b.logger.Info("job has no payload", "task_id", taskID) + b.logger.Info("job has no payload", slog.String("task_id", taskID)) } ti, err := b.inspector.GetTaskInfo(defaultAsynqQueue, taskID) if err != nil { - b.logger.Error("unable to process job", "error", err) + b.logger.Error("unable to process job", slog.Any("error", err)) return } if !ti.Deadline.IsZero() && ti.Deadline.UTC().Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline - b.logger.Debug("job deadline is in the past, skipping", "task_id", taskID) + b.logger.Debug("job deadline is in the past, skipping", slog.String("task_id", taskID)) return } if ti.Retried >= ti.MaxRetry { err = jobs.ErrJobExceededMaxRetries - b.logger.Debug("job has exceeded the maximum number of retries, skipping", "task_id", taskID) + b.logger.Debug("job has exceeded the maximum number of retries, skipping", slog.String("task_id", taskID)) return } @@ -243,7 +243,7 @@ func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { ctx = withJobContext(ctx, job) err = handler.Exec(ctx, h) if err != nil { - b.logger.Error("error handling job", "error", err) + b.logger.Error("error handling job", slog.Any("error", err)) } return From 4dc2c7f389644eb5f5e949f55f51941ba0ceb325 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Sun, 5 Nov 2023 20:25:00 -0600 Subject: [PATCH 2/3] chore: Tweak logging test --- neoq_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neoq_test.go b/neoq_test.go index 71e80f3..ebe6e44 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -224,7 +224,7 @@ func TestSetLogger(t *testing.T) { t.Error(err) } - expectedLogMsg := "adding a new job [queue testing]" + expectedLogMsg := "adding a new job [queue=testing]" results_loop: for { select { From d9919125c9de4599b9fc397c2b8a4effeeff2992 Mon Sep 17 00:00:00 2001 From: Elliot Courant Date: Sun, 5 Nov 2023 20:29:25 -0600 Subject: [PATCH 3/3] chore: Tweaking tests --- backends/redis/redis_backend_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index 9e2ecda..aefdb9c 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -296,7 +296,7 @@ func TestJobProcessingWithOptions(t *testing.T) { t.Error(e) } - expectedLogMsg := "error handling job [error job exceeded its 1ms timeout: context deadline exceeded]" //nolint: dupword + expectedLogMsg := "error handling job [error=job exceeded its 1ms timeout: context deadline exceeded]" //nolint: dupword select { case <-timeoutTimer: err = jobs.ErrJobTimeout