From 83102008ae0833dedb7e62555e11ddac5b03c013 Mon Sep 17 00:00:00 2001 From: Douglas Thrift Date: Wed, 15 May 2024 15:19:20 -0700 Subject: [PATCH] Fix various sync.Mutex double unlocks --- examples/weather/services/poller/worker.go | 2 +- pool/node.go | 1 - pool/worker.go | 5 +---- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/examples/weather/services/poller/worker.go b/examples/weather/services/poller/worker.go index cfff9d3..4f85500 100644 --- a/examples/weather/services/poller/worker.go +++ b/examples/weather/services/poller/worker.go @@ -1,5 +1,5 @@ /* -Package poller implments a simple poller for the weather service. +Package poller implements a simple poller for the weather service. It consists of a Pulse worker that periodically calls the weather service to retrieve the weather forecast for given US cities. */ diff --git a/pool/node.go b/pool/node.go index 2b4b605..fad9bbd 100644 --- a/pool/node.go +++ b/pool/node.go @@ -488,7 +488,6 @@ func (node *Node) ackWorkerEvent(ctx context.Context, ev *streaming.Event) { pending, ok := node.pendingEvents[key] if !ok { node.logger.Error(fmt.Errorf("received event %s from worker %s that was not dispatched", ack.EventID, workerID)) - node.lock.Unlock() return } diff --git a/pool/worker.go b/pool/worker.go index 6187667..1c956e1 100644 --- a/pool/worker.go +++ b/pool/worker.go @@ -192,7 +192,7 @@ func (w *Worker) stop(ctx context.Context) { w.lock.Lock() defer w.lock.Unlock() if w.stopped { - w.lock.Unlock() + return } w.stopped = true var err error @@ -233,7 +233,6 @@ func (w *Worker) startJob(ctx context.Context, job *Job) error { w.lock.Lock() defer w.lock.Unlock() if w.stopped { - w.lock.Unlock() return fmt.Errorf("worker %q stopped", w.ID) } job.Worker = w @@ -251,7 +250,6 @@ func (w *Worker) stopJob(ctx context.Context, key string) error { w.lock.Lock() defer w.lock.Unlock() if w.stopped { - w.lock.Unlock() return nil } if _, ok := w.jobs[key]; !ok { @@ -270,7 +268,6 @@ func (w *Worker) notify(ctx context.Context, key string, payload []byte) error { w.lock.Lock() defer w.lock.Unlock() if w.stopped { - w.lock.Unlock() w.logger.Debug("worker stopped, ignoring notification") return nil }