Skip to content

Commit 45a40d0

Browse files
author
Chodor Marek
committed
Configurable deadlock detector interval for ingester. Value of 0 disables deadlock_detector. #issue1225
Signed-off-by: Marek Chodor <[email protected]>
1 parent 4a07b78 commit 45a40d0

File tree

6 files changed

+129
-32
lines changed

6 files changed

+129
-32
lines changed

Diff for: cmd/ingester/app/builder/builder.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
7070
}
7171

7272
consumerParams := consumer.Params{
73-
InternalConsumer: saramaConsumer,
74-
ProcessorFactory: *processorFactory,
75-
Factory: metricsFactory,
76-
Logger: logger,
73+
InternalConsumer: saramaConsumer,
74+
ProcessorFactory: *processorFactory,
75+
Factory: metricsFactory,
76+
Logger: logger,
77+
DeadlockCheckInterval: options.DeadlockInterval,
7778
}
7879
return consumer.New(consumerParams)
7980
}

Diff for: cmd/ingester/app/consumer/consumer.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ import (
2929

3030
// Params are the parameters of a Consumer
3131
type Params struct {
32-
ProcessorFactory ProcessorFactory
33-
Factory metrics.Factory
34-
Logger *zap.Logger
35-
InternalConsumer consumer.Consumer
32+
ProcessorFactory ProcessorFactory
33+
Factory metrics.Factory
34+
Logger *zap.Logger
35+
InternalConsumer consumer.Consumer
36+
DeadlockCheckInterval time.Duration
3637
}
3738

3839
// Consumer uses sarama to consume and handle messages from kafka
@@ -55,7 +56,7 @@ type consumerState struct {
5556

5657
// New is a constructor for a Consumer
5758
func New(params Params) (*Consumer, error) {
58-
deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, time.Minute)
59+
deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, params.DeadlockCheckInterval)
5960
return &Consumer{
6061
metricsFactory: params.Factory,
6162
logger: params.Logger,

Diff for: cmd/ingester/app/consumer/deadlock_detector.go

+45-21
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ type partitionDeadlockDetector struct {
5252
closePartition chan struct{}
5353
done chan struct{}
5454
incrementAllPartitionMsgCount func()
55+
closed bool
5556
}
5657

5758
type allPartitionsDeadlockDetector struct {
5859
msgConsumed *uint64
5960
logger *zap.Logger
6061
done chan struct{}
62+
closed bool
6163
}
6264

6365
func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
@@ -87,13 +89,19 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti
8789
closePartition: make(chan struct{}, 1),
8890
done: make(chan struct{}),
8991
logger: s.logger,
92+
closed: false,
9093

9194
incrementAllPartitionMsgCount: func() {
9295
s.allPartitionsDeadlockDetector.incrementMsgCount()
9396
},
9497
}
9598

96-
go s.monitorForPartition(w, partition)
99+
if s.interval == 0 {
100+
s.logger.Debug("Partition deadlock detector disabled")
101+
w.closed = true
102+
} else {
103+
go s.monitorForPartition(w, partition)
104+
}
97105

98106
return w
99107
}
@@ -135,34 +143,45 @@ func (s *deadlockDetector) start() {
135143
msgConsumed: &msgConsumed,
136144
done: make(chan struct{}),
137145
logger: s.logger,
146+
closed: false,
138147
}
139148

140-
go func() {
149+
if s.interval == 0 {
150+
s.logger.Debug("Global deadlock detector disabled")
151+
detector.closed = true
152+
} else {
141153
s.logger.Debug("Starting global deadlock detector")
142-
ticker := time.NewTicker(s.interval)
143-
defer ticker.Stop()
144-
145-
for {
146-
select {
147-
case <-detector.done:
148-
s.logger.Debug("Closing global ticker routine")
149-
return
150-
case <-ticker.C:
151-
if atomic.LoadUint64(detector.msgConsumed) == 0 {
152-
s.panicFunc(-1)
153-
return // For tests
154+
go func() {
155+
ticker := time.NewTicker(s.interval)
156+
defer ticker.Stop()
157+
158+
for {
159+
select {
160+
case <-detector.done:
161+
s.logger.Debug("Closing global ticker routine")
162+
return
163+
case <-ticker.C:
164+
if atomic.LoadUint64(detector.msgConsumed) == 0 {
165+
s.panicFunc(-1)
166+
return // For tests
167+
}
168+
atomic.StoreUint64(detector.msgConsumed, 0)
154169
}
155-
atomic.StoreUint64(detector.msgConsumed, 0)
156170
}
157-
}
158-
}()
171+
}()
172+
}
159173

160174
s.allPartitionsDeadlockDetector = detector
161175
}
162176

163177
func (s *deadlockDetector) close() {
164-
s.logger.Debug("Closing all partitions deadlock detector")
165-
s.allPartitionsDeadlockDetector.done <- struct{}{}
178+
if !s.allPartitionsDeadlockDetector.closed {
179+
s.logger.Debug("Closing all partitions deadlock detector")
180+
s.allPartitionsDeadlockDetector.closed = true
181+
s.allPartitionsDeadlockDetector.done <- struct{}{}
182+
} else {
183+
s.logger.Debug("All partitions deadlock detector already closed")
184+
}
166185
}
167186

168187
func (s *allPartitionsDeadlockDetector) incrementMsgCount() {
@@ -174,8 +193,13 @@ func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} {
174193
}
175194

176195
func (w *partitionDeadlockDetector) close() {
177-
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition))
178-
w.done <- struct{}{}
196+
if !w.closed {
197+
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition))
198+
w.closed = true
199+
w.done <- struct{}{}
200+
} else {
201+
w.logger.Debug("Deadlock detector already closed", zap.Int32("partition", w.partition))
202+
}
179203
}
180204

181205
func (w *partitionDeadlockDetector) incrementMsgCount() {

Diff for: cmd/ingester/app/consumer/deadlock_detector_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,53 @@ func TestGlobalPanic(t *testing.T) {
112112
d.start()
113113
wg.Wait()
114114
}
115+
116+
func TestNoGlobalPanicIfDeadlockDetectorDisabled(t *testing.T) {
117+
l, _ := zap.NewDevelopment()
118+
d := deadlockDetector{
119+
metricsFactory: metrics.NewLocalFactory(0),
120+
logger: l,
121+
interval: 0,
122+
panicFunc: func(partition int32) {
123+
t.Errorf("Should not panic when deadlock detector is disabled")
124+
},
125+
}
126+
127+
d.start()
128+
129+
time.Sleep(100 * time.Millisecond)
130+
131+
d.close()
132+
}
133+
134+
func TestNoPanicForPartitionIfDeadlockDetectorDisabled(t *testing.T) {
135+
l, _ := zap.NewDevelopment()
136+
d := deadlockDetector{
137+
metricsFactory: metrics.NewLocalFactory(0),
138+
logger: l,
139+
interval: 0,
140+
panicFunc: func(partition int32) {
141+
t.Errorf("Should not panic when deadlock detector is disabled")
142+
},
143+
}
144+
145+
w := d.startMonitoringForPartition(1)
146+
time.Sleep(100 * time.Millisecond)
147+
148+
w.close()
149+
}
150+
151+
//same as TestNoClosingSignalIfMessagesProcessedInInterval but with disabled deadlock detector
152+
func TestApiCompatibilityWhenDeadlockDetectorDisabled(t *testing.T) {
153+
mf := metrics.NewLocalFactory(0)
154+
l, _ := zap.NewDevelopment()
155+
f := newDeadlockDetector(mf, l, 0)
156+
f.start()
157+
defer f.close()
158+
159+
w := f.startMonitoringForPartition(1)
160+
161+
w.incrementMsgCount()
162+
assert.Zero(t, len(w.closePartitionChannel()))
163+
w.close()
164+
}

Diff for: cmd/ingester/app/flags.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"strconv"
2121
"strings"
22+
"time"
2223

2324
"github.com/spf13/viper"
2425

@@ -43,6 +44,8 @@ const (
4344
SuffixParallelism = ".parallelism"
4445
// SuffixEncoding is a suffix for the encoding flag
4546
SuffixEncoding = ".encoding"
47+
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
48+
SuffixDeadlockInterval = ".deadlockInterval"
4649

4750
// DefaultBroker is the default kafka broker
4851
DefaultBroker = "127.0.0.1:9092"
@@ -54,13 +57,16 @@ const (
5457
DefaultParallelism = 1000
5558
// DefaultEncoding is the default span encoding
5659
DefaultEncoding = EncodingProto
60+
// DefaultDeadlockInterval is the default deadlock interval
61+
DefaultDeadlockInterval = 1 * time.Minute
5762
)
5863

5964
// Options stores the configuration options for the Ingester
6065
type Options struct {
6166
kafkaConsumer.Configuration
62-
Parallelism int
63-
Encoding string
67+
Parallelism int
68+
Encoding string
69+
DeadlockInterval time.Duration
6470
}
6571

6672
// AddFlags adds flags for Builder
@@ -85,6 +91,10 @@ func AddFlags(flagSet *flag.FlagSet) {
8591
ConfigPrefix+SuffixEncoding,
8692
DefaultEncoding,
8793
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
94+
flagSet.String(
95+
ConfigPrefix+SuffixDeadlockInterval,
96+
DefaultDeadlockInterval.String(),
97+
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
8898
}
8999

90100
// InitFromViper initializes Builder with properties from viper
@@ -94,4 +104,11 @@ func (o *Options) InitFromViper(v *viper.Viper) {
94104
o.GroupID = v.GetString(ConfigPrefix + SuffixGroupID)
95105
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
96106
o.Encoding = v.GetString(ConfigPrefix + SuffixEncoding)
107+
108+
d, err := time.ParseDuration(v.GetString(ConfigPrefix + SuffixDeadlockInterval))
109+
if err != nil {
110+
o.DeadlockInterval = DefaultDeadlockInterval
111+
} else {
112+
o.DeadlockInterval = d
113+
}
97114
}

Diff for: cmd/ingester/app/flags_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package app
1616

1717
import (
1818
"testing"
19+
"time"
1920

2021
"github.com/stretchr/testify/assert"
2122

@@ -30,13 +31,15 @@ func TestOptionsWithFlags(t *testing.T) {
3031
"--ingester.brokers=127.0.0.1:9092,0.0.0:1234",
3132
"--ingester.group-id=group1",
3233
"--ingester.parallelism=5",
34+
"--ingester.deadlockInterval=2m",
3335
"--ingester.encoding=json"})
3436
o.InitFromViper(v)
3537

3638
assert.Equal(t, "topic1", o.Topic)
3739
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
3840
assert.Equal(t, "group1", o.GroupID)
3941
assert.Equal(t, 5, o.Parallelism)
42+
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
4043
assert.Equal(t, EncodingJSON, o.Encoding)
4144
}
4245

@@ -51,4 +54,5 @@ func TestFlagDefaults(t *testing.T) {
5154
assert.Equal(t, DefaultGroupID, o.GroupID)
5255
assert.Equal(t, DefaultParallelism, o.Parallelism)
5356
assert.Equal(t, DefaultEncoding, o.Encoding)
57+
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
5458
}

0 commit comments

Comments
 (0)