Skip to content

Commit 8cf45ce

Browse files
author
Chodor Marek
committed
Making channels for deadlock detector only when it is enabled
Signed-off-by: Chodor Marek <[email protected]>
1 parent f22255d commit 8cf45ce

File tree

1 file changed

+27
-24
lines changed

1 file changed

+27
-24
lines changed

Diff for: cmd/ingester/app/consumer/deadlock_detector.go

+27-24
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ type partitionDeadlockDetector struct {
5252
closePartition chan struct{}
5353
done chan struct{}
5454
incrementAllPartitionMsgCount func()
55-
closed bool
55+
disabled bool
5656
}
5757

5858
type allPartitionsDeadlockDetector struct {
5959
msgConsumed *uint64
6060
logger *zap.Logger
6161
done chan struct{}
62-
closed bool
62+
disabled bool
6363
}
6464

6565
func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
@@ -84,12 +84,10 @@ func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, int
8484
func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partitionDeadlockDetector {
8585
var msgConsumed uint64
8686
w := &partitionDeadlockDetector{
87-
msgConsumed: &msgConsumed,
88-
partition: partition,
89-
closePartition: make(chan struct{}, 1),
90-
done: make(chan struct{}),
91-
logger: s.logger,
92-
closed: false,
87+
msgConsumed: &msgConsumed,
88+
partition: partition,
89+
logger: s.logger,
90+
disabled: 0 == s.interval,
9391

9492
incrementAllPartitionMsgCount: func() {
9593
s.allPartitionsDeadlockDetector.incrementMsgCount()
@@ -98,8 +96,9 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti
9896

9997
if s.interval == 0 {
10098
s.logger.Debug("Partition deadlock detector disabled")
101-
w.closed = true
10299
} else {
100+
w.closePartition = make(chan struct{}, 1)
101+
w.done = make(chan struct{})
103102
go s.monitorForPartition(w, partition)
104103
}
105104

@@ -141,16 +140,15 @@ func (s *deadlockDetector) start() {
141140
var msgConsumed uint64
142141
detector := &allPartitionsDeadlockDetector{
143142
msgConsumed: &msgConsumed,
144-
done: make(chan struct{}),
145143
logger: s.logger,
146-
closed: false,
144+
disabled: 0 == s.interval,
147145
}
148146

149147
if s.interval == 0 {
150148
s.logger.Debug("Global deadlock detector disabled")
151-
detector.closed = true
152149
} else {
153150
s.logger.Debug("Starting global deadlock detector")
151+
detector.done = make(chan struct{})
154152
go func() {
155153
ticker := time.NewTicker(s.interval)
156154
defer ticker.Stop()
@@ -175,34 +173,39 @@ func (s *deadlockDetector) start() {
175173
}
176174

177175
func (s *deadlockDetector) close() {
178-
if !s.allPartitionsDeadlockDetector.closed {
179-
s.logger.Debug("Closing all partitions deadlock detector")
180-
s.allPartitionsDeadlockDetector.closed = true
181-
s.allPartitionsDeadlockDetector.done <- struct{}{}
182-
} else {
183-
s.logger.Debug("All partitions deadlock detector already closed")
176+
if s.allPartitionsDeadlockDetector.disabled {
177+
return
184178
}
179+
s.logger.Debug("Closing all partitions deadlock detector")
180+
s.allPartitionsDeadlockDetector.done <- struct{}{}
185181
}
186182

187183
func (s *allPartitionsDeadlockDetector) incrementMsgCount() {
184+
if s.disabled {
185+
return
186+
}
188187
atomic.AddUint64(s.msgConsumed, 1)
189188
}
190189

191190
func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} {
191+
if w.disabled {
192+
return nil
193+
}
192194
return w.closePartition
193195
}
194196

195197
func (w *partitionDeadlockDetector) close() {
196-
if !w.closed {
197-
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition))
198-
w.closed = true
199-
w.done <- struct{}{}
200-
} else {
201-
w.logger.Debug("Deadlock detector already closed", zap.Int32("partition", w.partition))
198+
if w.disabled {
199+
return
202200
}
201+
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition))
202+
w.done <- struct{}{}
203203
}
204204

205205
func (w *partitionDeadlockDetector) incrementMsgCount() {
206+
if w.disabled {
207+
return
208+
}
206209
w.incrementAllPartitionMsgCount()
207210
atomic.AddUint64(w.msgConsumed, 1)
208211
}

0 commit comments

Comments
 (0)