diff --git a/backends/memory/memory_backend.go b/backends/memory/memory_backend.go index bf1f64a..b0ec080 100644 --- a/backends/memory/memory_backend.go +++ b/backends/memory/memory_backend.go @@ -81,7 +81,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, job.Status = internal.JobStatusNew } - m.logger.Debug("adding a new job", "queue", job.Queue) + m.logger.Debug("adding a new job", slog.String("queue", job.Queue)) if qc, ok = m.queues.Load(job.Queue); !ok { return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue) @@ -213,12 +213,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { if ht, ok = m.handlers.Load(queue); !ok { err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue) - m.logger.Error("error loading handler for queue", "queue", queue) + m.logger.Error("error loading handler for queue", slog.String("queue", queue)) return } if qc, ok = m.queues.Load(queue); !ok { - m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue) + m.logger.Error("error loading channel for queue", slog.String("queue", queue), slog.Any("error", handler.ErrNoHandlerForQueue)) return err } @@ -247,7 +247,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) { return } - m.logger.Error("job failed", "job_id", job.ID, "error", err) + m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err)) runAfter := internal.CalculateBackoff(job.Retries) job.RunAfter = runAfter @@ -286,11 +286,17 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) { timeUntilRunAfter := time.Until(job.RunAfter) if timeUntilRunAfter <= m.config.FutureJobWindow { m.removeFutureJob(job.ID) - m.logger.Debug("dequeued job", "queue", job.Queue, "retries", job.Retries, "next_run", timeUntilRunAfter, "job_id", job.ID) + m.logger.Debug( + "dequeued job", + slog.String("queue", job.Queue), + slog.Int("retries", job.Retries), + slog.String("next_run", timeUntilRunAfter.String()), + slog.Int64("job_id", job.ID), + ) go func(j *jobs.Job) { scheduleCh := time.After(timeUntilRunAfter) <-scheduleCh - m.logger.Debug("loading job for queue", "queue", j.Queue) + m.logger.Debug("loading job for queue", slog.String("queue", j.Queue)) if qc, ok := m.queues.Load(j.Queue); ok { queueChan = qc.(chan *jobs.Job) queueChan <- j @@ -314,14 +320,23 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) { func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) { ctx = withJobContext(ctx, job) - m.logger.Debug("handling job", "status", job.Status, "retries", job.Retries, "job_id", job.ID) + m.logger.Debug( + "handling job", + slog.String("status", job.Status), + slog.Int("retries", job.Retries), + slog.Int64("job_id", job.ID), + ) if job.Status != internal.JobStatusNew { job.Retries++ } if job.Deadline != nil && job.Deadline.UTC().Before(time.Now().UTC()) { - m.logger.Debug("job deadline is in the past, skipping", "deadline", job.Deadline, "job_id", job.ID) + m.logger.Debug( + "job deadline is in the past, skipping", + slog.Time("deadline", *job.Deadline), + slog.Int64("job_id", job.ID), + ) err = jobs.ErrJobExceededDeadline return } @@ -337,7 +352,12 @@ func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Han } if job.MaxRetries != nil && job.Retries >= *job.MaxRetries { - m.logger.Debug("job exceeded max retries", "retries", job.Retries, "max_retries", *job.MaxRetries, "job_id", job.ID) + m.logger.Debug( + "job exceeded max retries", + slog.Int("retries", job.Retries), + slog.Int("max_retries", *job.MaxRetries), + slog.Int64("job_id", job.ID), + ) err = jobs.ErrJobExceededMaxRetries } diff --git a/backends/postgres/postgres_backend.go b/backends/postgres/postgres_backend.go index e476ff8..4ee71be 100644 --- a/backends/postgres/postgres_backend.go +++ b/backends/postgres/postgres_backend.go @@ -227,7 +227,7 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) { func (p *PgBackend) initializeDB() (err error) { migrations, err := iofs.New(migrationsFS, "migrations") if err != nil { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } @@ -238,7 +238,7 @@ func (p *PgBackend) initializeDB() (err error) { var pgxCfg *pgx.ConnConfig pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString) if err != nil { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } @@ -256,7 +256,7 @@ func (p *PgBackend) initializeDB() (err error) { sslMode) m, err := migrate.NewWithSourceInstance("iofs", migrations, pqConnectionString) if err != nil { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } // We don't need the migration tooling to hold it's connections to the DB once it has been completed. @@ -264,7 +264,7 @@ func (p *PgBackend) initializeDB() (err error) { err = m.Up() if err != nil && !errors.Is(err, migrate.ErrNoChange) { - p.logger.Error("unable to run migrations", "error", err) + p.logger.Error("unable to run migrations", slog.Any("error", err)) return } @@ -278,9 +278,9 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e return } - p.logger.Debug("enqueueing job payload", "queue", job.Queue, slog.Any("job_payload", job.Payload)) + p.logger.Debug("enqueueing job payload", slog.String("queue", job.Queue), slog.Any("job_payload", job.Payload)) - p.logger.Debug("acquiring new connection from connection pool", "queue", job.Queue) + p.logger.Debug("acquiring new connection from connection pool", slog.String("queue", job.Queue)) conn, err := p.pool.Acquire(ctx) if err != nil { err = fmt.Errorf("error acquiring connection: %w", err) @@ -288,7 +288,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e } defer conn.Release() - p.logger.Debug("beginning new transaction to enqueue job", "queue", job.Queue) + p.logger.Debug("beginning new transaction to enqueue job", slog.String("queue", job.Queue)) tx, err := conn.Begin(ctx) if err != nil { err = fmt.Errorf("error creating transaction: %w", err) @@ -307,7 +307,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e return } } - p.logger.Error("error enqueueing job", "queue", job.Queue, "error", err) + p.logger.Error("error enqueueing job", slog.String("queue", job.Queue), slog.Any("error", err)) err = fmt.Errorf("error enqueuing job: %w", err) } @@ -316,14 +316,19 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e err = fmt.Errorf("error committing transaction: %w", err) return } - p.logger.Debug("job added to queue:", "queue", job.Queue, "job_id", jobID) + p.logger.Debug("job added to queue:", slog.String("queue", job.Queue), slog.String("job_id", jobID)) // add future jobs to the future job list if job.RunAfter.After(time.Now().UTC()) { p.mu.Lock() p.futureJobs[jobID] = job p.mu.Unlock() - p.logger.Debug("added job to future jobs list", "queue", job.Queue, "job_id", jobID, "run_after", job.RunAfter) + p.logger.Debug( + "added job to future jobs list", + slog.String("queue", job.Queue), + slog.String("job_id", jobID), + slog.Time("run_after", job.RunAfter), + ) } return jobID, nil @@ -333,7 +338,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { ctx, cancel := context.WithCancel(ctx) - p.logger.Debug("starting job processing", "queue", h.Queue) + p.logger.Debug("starting job processing", slog.String("queue", h.Queue)) p.mu.Lock() p.cancelFuncs = append(p.cancelFuncs, cancel) p.handlers[h.Queue] = h @@ -341,7 +346,7 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { err = p.start(ctx, h) if err != nil { - p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err) + p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err)) return } return @@ -353,13 +358,23 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) { func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) { cd, err := crondescriptor.NewCronDescriptor(cronSpec) if err != nil { - p.logger.Error("error creating cron descriptor", "queue", h.Queue, "cronspec", cronSpec, "error", err) + p.logger.Error( + "error creating cron descriptor", + slog.String("queue", h.Queue), + slog.String("cronspec", cronSpec), + slog.Any("error", err), + ) return fmt.Errorf("error creating cron descriptor: %w", err) } cdStr, err := cd.GetDescription(crondescriptor.Full) if err != nil { - p.logger.Error("error getting cron descriptor", "queue", h.Queue, "descriptor", crondescriptor.Full, "error", err) + p.logger.Error( + "error getting cron descriptor", + slog.String("queue", h.Queue), + slog.Any("descriptor", crondescriptor.Full), + slog.Any("error", err), + ) return fmt.Errorf("error getting cron description: %w", err) } @@ -382,7 +397,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha return } - p.logger.Error("error queueing cron job", "queue", h.Queue, "error", err) + p.logger.Error("error queueing cron job", slog.String("queue", h.Queue), slog.Any("error", err)) } }); err != nil { return fmt.Errorf("error adding cron: %w", err) @@ -398,7 +413,7 @@ func (p *PgBackend) SetLogger(logger logging.Logger) { // Shutdown shuts this backend down func (p *PgBackend) Shutdown(ctx context.Context) { - p.logger.Debug("starting shutdown.") + p.logger.Debug("starting shutdown") for queue := range p.handlers { p.announceJob(ctx, queue, shutdownJobID) } @@ -427,7 +442,7 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job return } - p.logger.Debug("adding job to the queue", "queue", j.Queue) + p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue)) err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`, j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID) @@ -474,7 +489,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) { errMsg := "" if jobErr != nil { - p.logger.Error("job failed", "job_error", jobErr) + p.logger.Error("job failed", slog.Any("job_error", jobErr)) status = internal.JobStatusFailed errMsg = jobErr.Error() } @@ -558,7 +573,12 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { continue } - p.logger.Error("job failed", "queue", h.Queue, "error", err, "job_id", jobID) + p.logger.Error( + "job failed", + slog.String("queue", h.Queue), + slog.Any("error", err), + slog.String("job_id", jobID), + ) continue } @@ -574,7 +594,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) { func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) { rows, err := p.pool.Query(ctx, FutureJobQuery, queue) if err != nil { - p.logger.Error("failed to fetch future jobs list", "queue", queue, "error", err) + p.logger.Error("failed to fetch future jobs list", slog.String("queue", queue), slog.Any("error", err)) return } @@ -666,7 +686,11 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("failed to acquire database connection to listen for pending queue items", "queue", queue, "error", err) + p.logger.Error( + "failed to acquire database connection to listen for pending queue items", + slog.String("queue", queue), + slog.Any("error", err), + ) return } @@ -680,7 +704,12 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan break } - p.logger.Error("failed to fetch pending job", "queue", queue, "error", err, "job_id", jobID) + p.logger.Error( + "failed to fetch pending job", + slog.String("queue", queue), + slog.Any("error", err), + slog.String("job_id", jobID), + ) } else { jobsCh <- jobID } @@ -716,7 +745,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline - p.logger.Debug("job deadline is in the past, skipping", "queue", h.Queue, "job_id", job.ID) + p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", h.Queue), slog.Int64("job_id", job.ID)) err = p.updateJob(ctx, err) return } @@ -744,7 +773,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl err = tx.Commit(ctx) if err != nil { errMsg := "unable to commit job transaction. retrying this job may dupliate work:" - p.logger.Error(errMsg, "queue", h.Queue, "error", err, "job_id", job.ID) + p.logger.Error(errMsg, slog.String("queue", h.Queue), slog.Any("error", err), slog.Int64("job_id", job.ID)) return fmt.Errorf("%s %w", errMsg, err) } @@ -762,7 +791,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re go func(ctx context.Context) { conn, err := p.pool.Acquire(ctx) if err != nil { - p.logger.Error("unable to acquire new listener connection", "queue", queue, "error", err) + p.logger.Error("unable to acquire new listener connection", slog.String("queue", queue), slog.Any("error", err)) return } defer p.release(ctx, conn, queue) @@ -771,7 +800,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re _, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue)) if err != nil { err = fmt.Errorf("unable to configure listener connection: %w", err) - p.logger.Error("unable to configure listener connection", "queue", queue, "error", err) + p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err)) return } @@ -780,13 +809,18 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re for { notification, waitErr := conn.Conn().WaitForNotification(ctx) - p.logger.Debug("job notification for queue", "queue", queue, "notification", notification, "err", err) + p.logger.Debug( + "job notification for queue", + slog.String("queue", queue), + slog.Any("notification", notification), + slog.Any("err", err), + ) if waitErr != nil { if errors.Is(waitErr, context.Canceled) { return } - p.logger.Error("failed to wait for notification", "queue", queue, "error", waitErr) + p.logger.Error("failed to wait for notification", slog.String("queue", queue), slog.Any("error", waitErr)) continue } @@ -810,7 +844,11 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin return } - p.logger.Error("unable to reset connection config before release", "queue", queue, "error", err) + p.logger.Error( + "unable to reset connection config before release", + slog.String("queue", queue), + slog.Any("error", err), + ) } conn.Release() diff --git a/backends/redis/redis_backend.go b/backends/redis/redis_backend.go index e413ec3..6d20f91 100644 --- a/backends/redis/redis_backend.go +++ b/backends/redis/redis_backend.go @@ -120,7 +120,7 @@ func Backend(_ context.Context, opts ...neoq.ConfigOption) (backend neoq.Neoq, e SchedulerOpts: &asynq.SchedulerOpts{ PostEnqueueFunc: func(_ *asynq.TaskInfo, err error) { if err != nil { - b.logger.Error("unable to schedule task", err) + b.logger.Error("unable to schedule task", slog.Any("error", err)) } }, }, @@ -210,24 +210,24 @@ func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { taskID := t.ResultWriter().TaskID() var p map[string]any if err = json.Unmarshal(t.Payload(), &p); err != nil { - b.logger.Info("job has no payload", "task_id", taskID) + b.logger.Info("job has no payload", slog.String("task_id", taskID)) } ti, err := b.inspector.GetTaskInfo(defaultAsynqQueue, taskID) if err != nil { - b.logger.Error("unable to process job", "error", err) + b.logger.Error("unable to process job", slog.Any("error", err)) return } if !ti.Deadline.IsZero() && ti.Deadline.UTC().Before(time.Now().UTC()) { err = jobs.ErrJobExceededDeadline - b.logger.Debug("job deadline is in the past, skipping", "task_id", taskID) + b.logger.Debug("job deadline is in the past, skipping", slog.String("task_id", taskID)) return } if ti.Retried >= ti.MaxRetry { err = jobs.ErrJobExceededMaxRetries - b.logger.Debug("job has exceeded the maximum number of retries, skipping", "task_id", taskID) + b.logger.Debug("job has exceeded the maximum number of retries, skipping", slog.String("task_id", taskID)) return } @@ -243,7 +243,7 @@ func (b *RedisBackend) Start(_ context.Context, h handler.Handler) (err error) { ctx = withJobContext(ctx, job) err = handler.Exec(ctx, h) if err != nil { - b.logger.Error("error handling job", "error", err) + b.logger.Error("error handling job", slog.Any("error", err)) } return diff --git a/backends/redis/redis_backend_test.go b/backends/redis/redis_backend_test.go index 9e2ecda..aefdb9c 100644 --- a/backends/redis/redis_backend_test.go +++ b/backends/redis/redis_backend_test.go @@ -296,7 +296,7 @@ func TestJobProcessingWithOptions(t *testing.T) { t.Error(e) } - expectedLogMsg := "error handling job [error job exceeded its 1ms timeout: context deadline exceeded]" //nolint: dupword + expectedLogMsg := "error handling job [error=job exceeded its 1ms timeout: context deadline exceeded]" //nolint: dupword select { case <-timeoutTimer: err = jobs.ErrJobTimeout diff --git a/neoq_test.go b/neoq_test.go index 71e80f3..ebe6e44 100644 --- a/neoq_test.go +++ b/neoq_test.go @@ -224,7 +224,7 @@ func TestSetLogger(t *testing.T) { t.Error(err) } - expectedLogMsg := "adding a new job [queue testing]" + expectedLogMsg := "adding a new job [queue=testing]" results_loop: for { select {