Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(log): Tweaking logging again, making everything consistent #101

Merged
merged 3 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions backends/memory/memory_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (m *MemBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string,
job.Status = internal.JobStatusNew
}

m.logger.Debug("adding a new job", "queue", job.Queue)
m.logger.Debug("adding a new job", slog.String("queue", job.Queue))

if qc, ok = m.queues.Load(job.Queue); !ok {
return jobs.UnqueuedJobID, fmt.Errorf("%w: %s", handler.ErrNoProcessorForQueue, job.Queue)
Expand Down Expand Up @@ -213,12 +213,12 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {

if ht, ok = m.handlers.Load(queue); !ok {
err = fmt.Errorf("%w: %s", handler.ErrNoHandlerForQueue, queue)
m.logger.Error("error loading handler for queue", "queue", queue)
m.logger.Error("error loading handler for queue", slog.String("queue", queue))
return
}

if qc, ok = m.queues.Load(queue); !ok {
m.logger.Error("error loading channel for queue", "queue", queue, "error", handler.ErrNoHandlerForQueue)
m.logger.Error("error loading channel for queue", slog.String("queue", queue), slog.Any("error", handler.ErrNoHandlerForQueue))
return err
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func (m *MemBackend) start(ctx context.Context, queue string) (err error) {
return
}

m.logger.Error("job failed", "job_id", job.ID, "error", err)
m.logger.Error("job failed", slog.Int64("job_id", job.ID), slog.Any("error", err))

runAfter := internal.CalculateBackoff(job.Retries)
job.RunAfter = runAfter
Expand Down Expand Up @@ -286,11 +286,17 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) {
timeUntilRunAfter := time.Until(job.RunAfter)
if timeUntilRunAfter <= m.config.FutureJobWindow {
m.removeFutureJob(job.ID)
m.logger.Debug("dequeued job", "queue", job.Queue, "retries", job.Retries, "next_run", timeUntilRunAfter, "job_id", job.ID)
m.logger.Debug(
"dequeued job",
slog.String("queue", job.Queue),
slog.Int("retries", job.Retries),
slog.String("next_run", timeUntilRunAfter.String()),
slog.Int64("job_id", job.ID),
)
go func(j *jobs.Job) {
scheduleCh := time.After(timeUntilRunAfter)
<-scheduleCh
m.logger.Debug("loading job for queue", "queue", j.Queue)
m.logger.Debug("loading job for queue", slog.String("queue", j.Queue))
if qc, ok := m.queues.Load(j.Queue); ok {
queueChan = qc.(chan *jobs.Job)
queueChan <- j
Expand All @@ -314,14 +320,23 @@ func (m *MemBackend) scheduleFutureJobs(ctx context.Context) {
func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Handler) (err error) {
ctx = withJobContext(ctx, job)

m.logger.Debug("handling job", "status", job.Status, "retries", job.Retries, "job_id", job.ID)
m.logger.Debug(
"handling job",
slog.String("status", job.Status),
slog.Int("retries", job.Retries),
slog.Int64("job_id", job.ID),
)

if job.Status != internal.JobStatusNew {
job.Retries++
}

if job.Deadline != nil && job.Deadline.UTC().Before(time.Now().UTC()) {
m.logger.Debug("job deadline is in the past, skipping", "deadline", job.Deadline, "job_id", job.ID)
m.logger.Debug(
"job deadline is in the past, skipping",
slog.Time("deadline", *job.Deadline),
slog.Int64("job_id", job.ID),
)
err = jobs.ErrJobExceededDeadline
return
}
Expand All @@ -337,7 +352,12 @@ func (m *MemBackend) handleJob(ctx context.Context, job *jobs.Job, h handler.Han
}

if job.MaxRetries != nil && job.Retries >= *job.MaxRetries {
m.logger.Debug("job exceeded max retries", "retries", job.Retries, "max_retries", *job.MaxRetries, "job_id", job.ID)
m.logger.Debug(
"job exceeded max retries",
slog.Int("retries", job.Retries),
slog.Int("max_retries", *job.MaxRetries),
slog.Int64("job_id", job.ID),
)
err = jobs.ErrJobExceededMaxRetries
}

Expand Down
96 changes: 67 additions & 29 deletions backends/postgres/postgres_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func txFromContext(ctx context.Context) (t pgx.Tx, err error) {
func (p *PgBackend) initializeDB() (err error) {
migrations, err := iofs.New(migrationsFS, "migrations")
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}

Expand All @@ -238,7 +238,7 @@ func (p *PgBackend) initializeDB() (err error) {
var pgxCfg *pgx.ConnConfig
pgxCfg, err = pgx.ParseConfig(p.config.ConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}

Expand All @@ -256,15 +256,15 @@ func (p *PgBackend) initializeDB() (err error) {
sslMode)
m, err := migrate.NewWithSourceInstance("iofs", migrations, pqConnectionString)
if err != nil {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}
// We don't need the migration tooling to hold it's connections to the DB once it has been completed.
defer m.Close()

err = m.Up()
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
p.logger.Error("unable to run migrations", "error", err)
p.logger.Error("unable to run migrations", slog.Any("error", err))
return
}

Expand All @@ -278,17 +278,17 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
return
}

p.logger.Debug("enqueueing job payload", "queue", job.Queue, slog.Any("job_payload", job.Payload))
p.logger.Debug("enqueueing job payload", slog.String("queue", job.Queue), slog.Any("job_payload", job.Payload))

p.logger.Debug("acquiring new connection from connection pool", "queue", job.Queue)
p.logger.Debug("acquiring new connection from connection pool", slog.String("queue", job.Queue))
conn, err := p.pool.Acquire(ctx)
if err != nil {
err = fmt.Errorf("error acquiring connection: %w", err)
return
}
defer conn.Release()

p.logger.Debug("beginning new transaction to enqueue job", "queue", job.Queue)
p.logger.Debug("beginning new transaction to enqueue job", slog.String("queue", job.Queue))
tx, err := conn.Begin(ctx)
if err != nil {
err = fmt.Errorf("error creating transaction: %w", err)
Expand All @@ -307,7 +307,7 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
return
}
}
p.logger.Error("error enqueueing job", "queue", job.Queue, "error", err)
p.logger.Error("error enqueueing job", slog.String("queue", job.Queue), slog.Any("error", err))
err = fmt.Errorf("error enqueuing job: %w", err)
}

Expand All @@ -316,14 +316,19 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
err = fmt.Errorf("error committing transaction: %w", err)
return
}
p.logger.Debug("job added to queue:", "queue", job.Queue, "job_id", jobID)
p.logger.Debug("job added to queue:", slog.String("queue", job.Queue), slog.String("job_id", jobID))

// add future jobs to the future job list
if job.RunAfter.After(time.Now().UTC()) {
p.mu.Lock()
p.futureJobs[jobID] = job
p.mu.Unlock()
p.logger.Debug("added job to future jobs list", "queue", job.Queue, "job_id", jobID, "run_after", job.RunAfter)
p.logger.Debug(
"added job to future jobs list",
slog.String("queue", job.Queue),
slog.String("job_id", jobID),
slog.Time("run_after", job.RunAfter),
)
}

return jobID, nil
Expand All @@ -333,15 +338,15 @@ func (p *PgBackend) Enqueue(ctx context.Context, job *jobs.Job) (jobID string, e
func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
ctx, cancel := context.WithCancel(ctx)

p.logger.Debug("starting job processing", "queue", h.Queue)
p.logger.Debug("starting job processing", slog.String("queue", h.Queue))
p.mu.Lock()
p.cancelFuncs = append(p.cancelFuncs, cancel)
p.handlers[h.Queue] = h
p.mu.Unlock()

err = p.start(ctx, h)
if err != nil {
p.logger.Error("unable to start processing queue", "queue", h.Queue, "error", err)
p.logger.Error("unable to start processing queue", slog.String("queue", h.Queue), slog.Any("error", err))
return
}
return
Expand All @@ -353,13 +358,23 @@ func (p *PgBackend) Start(ctx context.Context, h handler.Handler) (err error) {
func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Handler) (err error) {
cd, err := crondescriptor.NewCronDescriptor(cronSpec)
if err != nil {
p.logger.Error("error creating cron descriptor", "queue", h.Queue, "cronspec", cronSpec, "error", err)
p.logger.Error(
"error creating cron descriptor",
slog.String("queue", h.Queue),
slog.String("cronspec", cronSpec),
slog.Any("error", err),
)
return fmt.Errorf("error creating cron descriptor: %w", err)
}

cdStr, err := cd.GetDescription(crondescriptor.Full)
if err != nil {
p.logger.Error("error getting cron descriptor", "queue", h.Queue, "descriptor", crondescriptor.Full, "error", err)
p.logger.Error(
"error getting cron descriptor",
slog.String("queue", h.Queue),
slog.Any("descriptor", crondescriptor.Full),
slog.Any("error", err),
)
return fmt.Errorf("error getting cron description: %w", err)
}

Expand All @@ -382,7 +397,7 @@ func (p *PgBackend) StartCron(ctx context.Context, cronSpec string, h handler.Ha
return
}

p.logger.Error("error queueing cron job", "queue", h.Queue, "error", err)
p.logger.Error("error queueing cron job", slog.String("queue", h.Queue), slog.Any("error", err))
}
}); err != nil {
return fmt.Errorf("error adding cron: %w", err)
Expand All @@ -398,7 +413,7 @@ func (p *PgBackend) SetLogger(logger logging.Logger) {

// Shutdown shuts this backend down
func (p *PgBackend) Shutdown(ctx context.Context) {
p.logger.Debug("starting shutdown.")
p.logger.Debug("starting shutdown")
for queue := range p.handlers {
p.announceJob(ctx, queue, shutdownJobID)
}
Expand Down Expand Up @@ -427,7 +442,7 @@ func (p *PgBackend) enqueueJob(ctx context.Context, tx pgx.Tx, j *jobs.Job) (job
return
}

p.logger.Debug("adding job to the queue", "queue", j.Queue)
p.logger.Debug("adding job to the queue", slog.String("queue", j.Queue))
err = tx.QueryRow(ctx, `INSERT INTO neoq_jobs(queue, fingerprint, payload, run_after, deadline, max_retries)
VALUES ($1, $2, $3, $4, $5, $6) RETURNING id`,
j.Queue, j.Fingerprint, j.Payload, j.RunAfter, j.Deadline, j.MaxRetries).Scan(&jobID)
Expand Down Expand Up @@ -474,7 +489,7 @@ func (p *PgBackend) updateJob(ctx context.Context, jobErr error) (err error) {
errMsg := ""

if jobErr != nil {
p.logger.Error("job failed", "job_error", jobErr)
p.logger.Error("job failed", slog.Any("job_error", jobErr))
status = internal.JobStatusFailed
errMsg = jobErr.Error()
}
Expand Down Expand Up @@ -558,7 +573,12 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
continue
}

p.logger.Error("job failed", "queue", h.Queue, "error", err, "job_id", jobID)
p.logger.Error(
"job failed",
slog.String("queue", h.Queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)

continue
}
Expand All @@ -574,7 +594,7 @@ func (p *PgBackend) start(ctx context.Context, h handler.Handler) (err error) {
func (p *PgBackend) initFutureJobs(ctx context.Context, queue string) (err error) {
rows, err := p.pool.Query(ctx, FutureJobQuery, queue)
if err != nil {
p.logger.Error("failed to fetch future jobs list", "queue", queue, "error", err)
p.logger.Error("failed to fetch future jobs list", slog.String("queue", queue), slog.Any("error", err))
return
}

Expand Down Expand Up @@ -666,7 +686,11 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan

conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("failed to acquire database connection to listen for pending queue items", "queue", queue, "error", err)
p.logger.Error(
"failed to acquire database connection to listen for pending queue items",
slog.String("queue", queue),
slog.Any("error", err),
)
return
}

Expand All @@ -680,7 +704,12 @@ func (p *PgBackend) pendingJobs(ctx context.Context, queue string) (jobsCh chan
break
}

p.logger.Error("failed to fetch pending job", "queue", queue, "error", err, "job_id", jobID)
p.logger.Error(
"failed to fetch pending job",
slog.String("queue", queue),
slog.Any("error", err),
slog.String("job_id", jobID),
)
} else {
jobsCh <- jobID
}
Expand Down Expand Up @@ -716,7 +745,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl

if job.Deadline != nil && job.Deadline.Before(time.Now().UTC()) {
err = jobs.ErrJobExceededDeadline
p.logger.Debug("job deadline is in the past, skipping", "queue", h.Queue, "job_id", job.ID)
p.logger.Debug("job deadline is in the past, skipping", slog.String("queue", h.Queue), slog.Int64("job_id", job.ID))
err = p.updateJob(ctx, err)
return
}
Expand Down Expand Up @@ -744,7 +773,7 @@ func (p *PgBackend) handleJob(ctx context.Context, jobID string, h handler.Handl
err = tx.Commit(ctx)
if err != nil {
errMsg := "unable to commit job transaction. retrying this job may dupliate work:"
p.logger.Error(errMsg, "queue", h.Queue, "error", err, "job_id", job.ID)
p.logger.Error(errMsg, slog.String("queue", h.Queue), slog.Any("error", err), slog.Int64("job_id", job.ID))
return fmt.Errorf("%s %w", errMsg, err)
}

Expand All @@ -762,7 +791,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
go func(ctx context.Context) {
conn, err := p.pool.Acquire(ctx)
if err != nil {
p.logger.Error("unable to acquire new listener connection", "queue", queue, "error", err)
p.logger.Error("unable to acquire new listener connection", slog.String("queue", queue), slog.Any("error", err))
return
}
defer p.release(ctx, conn, queue)
Expand All @@ -771,7 +800,7 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re
_, err = conn.Exec(ctx, fmt.Sprintf(`SET idle_in_transaction_session_timeout = '0'; LISTEN %q`, queue))
if err != nil {
err = fmt.Errorf("unable to configure listener connection: %w", err)
p.logger.Error("unable to configure listener connection", "queue", queue, "error", err)
p.logger.Error("unable to configure listener connection", slog.String("queue", queue), slog.Any("error", err))
return
}

Expand All @@ -780,13 +809,18 @@ func (p *PgBackend) listen(ctx context.Context, queue string) (c chan string, re

for {
notification, waitErr := conn.Conn().WaitForNotification(ctx)
p.logger.Debug("job notification for queue", "queue", queue, "notification", notification, "err", err)
p.logger.Debug(
"job notification for queue",
slog.String("queue", queue),
slog.Any("notification", notification),
slog.Any("err", err),
)
if waitErr != nil {
if errors.Is(waitErr, context.Canceled) {
return
}

p.logger.Error("failed to wait for notification", "queue", queue, "error", waitErr)
p.logger.Error("failed to wait for notification", slog.String("queue", queue), slog.Any("error", waitErr))
continue
}

Expand All @@ -810,7 +844,11 @@ func (p *PgBackend) release(ctx context.Context, conn *pgxpool.Conn, queue strin
return
}

p.logger.Error("unable to reset connection config before release", "queue", queue, "error", err)
p.logger.Error(
"unable to reset connection config before release",
slog.String("queue", queue),
slog.Any("error", err),
)
}

conn.Release()
Expand Down
Loading
Loading