Skip to content

Commit

Permalink
Configurable deadlock detector interval for ingester. (resubmit) (#1134)
Browse files Browse the repository at this point in the history
* Configurable deadlock detector interval for ingester.
Value of 0 disables deadlock_detector. #issue1225

Signed-off-by: Chodor Marek <[email protected]>

* Making channels for deadlock detector only when it is enabled

Signed-off-by: Chodor Marek <[email protected]>

* Review fixes/improvements

Signed-off-by: Chodor Marek <[email protected]>
  • Loading branch information
marqc authored and pavolloffay committed Nov 12, 2018
1 parent be62816 commit b2bcba5
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 26 deletions.
9 changes: 5 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
}

consumerParams := consumer.Params{
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
MetricsFactory: metricsFactory,
Logger: logger,
InternalConsumer: saramaConsumer,
ProcessorFactory: *processorFactory,
MetricsFactory: metricsFactory,
Logger: logger,
DeadlockCheckInterval: options.DeadlockInterval,
}
return consumer.New(consumerParams)
}
11 changes: 6 additions & 5 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (

// Params are the parameters of a Consumer
type Params struct {
ProcessorFactory ProcessorFactory
MetricsFactory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
ProcessorFactory ProcessorFactory
MetricsFactory metrics.Factory
Logger *zap.Logger
InternalConsumer consumer.Consumer
DeadlockCheckInterval time.Duration
}

// Consumer uses sarama to consume and handle messages from kafka
Expand All @@ -56,7 +57,7 @@ type consumerState struct {

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, time.Minute)
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
Expand Down
52 changes: 35 additions & 17 deletions cmd/ingester/app/consumer/deadlock_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ type partitionDeadlockDetector struct {
closePartition chan struct{}
done chan struct{}
incrementAllPartitionMsgCount func()
disabled bool
}

type allPartitionsDeadlockDetector struct {
msgConsumed *uint64
logger *zap.Logger
done chan struct{}
disabled bool
}

func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
Expand Down Expand Up @@ -87,13 +89,18 @@ func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partiti
closePartition: make(chan struct{}, 1),
done: make(chan struct{}),
logger: s.logger,
disabled: 0 == s.interval,

incrementAllPartitionMsgCount: func() {
s.allPartitionsDeadlockDetector.incrementMsgCount()
},
}

go s.monitorForPartition(w, partition)
if w.disabled {
s.logger.Debug("Partition deadlock detector disabled")
} else {
go s.monitorForPartition(w, partition)
}

return w
}
Expand Down Expand Up @@ -135,32 +142,40 @@ func (s *deadlockDetector) start() {
msgConsumed: &msgConsumed,
done: make(chan struct{}),
logger: s.logger,
disabled: 0 == s.interval,
}

go func() {
if detector.disabled {
s.logger.Debug("Global deadlock detector disabled")
} else {
s.logger.Debug("Starting global deadlock detector")
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-detector.done:
s.logger.Debug("Closing global ticker routine")
return
case <-ticker.C:
if atomic.LoadUint64(detector.msgConsumed) == 0 {
s.panicFunc(-1)
return // For tests
go func() {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()

for {
select {
case <-detector.done:
s.logger.Debug("Closing global ticker routine")
return
case <-ticker.C:
if atomic.LoadUint64(detector.msgConsumed) == 0 {
s.panicFunc(-1)
return // For tests
}
atomic.StoreUint64(detector.msgConsumed, 0)
}
atomic.StoreUint64(detector.msgConsumed, 0)
}
}
}()
}()
}

s.allPartitionsDeadlockDetector = detector
}

func (s *deadlockDetector) close() {
if s.allPartitionsDeadlockDetector.disabled {
return
}
s.logger.Debug("Closing all partitions deadlock detector")
s.allPartitionsDeadlockDetector.done <- struct{}{}
}
Expand All @@ -174,6 +189,9 @@ func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} {
}

func (w *partitionDeadlockDetector) close() {
if w.disabled {
return
}
w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition))
w.done <- struct{}{}
}
Expand Down
51 changes: 51 additions & 0 deletions cmd/ingester/app/consumer/deadlock_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,54 @@ func TestGlobalPanic(t *testing.T) {
d.start()
wg.Wait()
}

func TestNoGlobalPanicIfDeadlockDetectorDisabled(t *testing.T) {
l, _ := zap.NewDevelopment()
d := deadlockDetector{
metricsFactory: metrics.NewLocalFactory(0),
logger: l,
interval: 0,
panicFunc: func(partition int32) {
t.Errorf("Should not panic when deadlock detector is disabled")
},
}

d.start()

time.Sleep(100 * time.Millisecond)

d.close()
}

func TestNoPanicForPartitionIfDeadlockDetectorDisabled(t *testing.T) {
l, _ := zap.NewDevelopment()
d := deadlockDetector{
metricsFactory: metrics.NewLocalFactory(0),
logger: l,
interval: 0,
panicFunc: func(partition int32) {
t.Errorf("Should not panic when deadlock detector is disabled")
},
}

w := d.startMonitoringForPartition(1)
time.Sleep(100 * time.Millisecond)

w.close()
}

//same as TestNoClosingSignalIfMessagesProcessedInInterval but with disabled deadlock detector
func TestApiCompatibilityWhenDeadlockDetectorDisabled(t *testing.T) {
mf := metrics.NewLocalFactory(0)
l, _ := zap.NewDevelopment()
f := newDeadlockDetector(mf, l, 0)
f.start()
defer f.close()

w := f.startMonitoringForPartition(1)

w.incrementMsgCount()
w.incrementAllPartitionMsgCount()
assert.Zero(t, len(w.closePartitionChannel()))
w.close()
}
12 changes: 12 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/spf13/viper"

Expand All @@ -43,6 +44,8 @@ const (
SuffixGroupID = ".group-id"
// SuffixEncoding is a suffix for the encoding flag
SuffixEncoding = ".encoding"
// SuffixDeadlockInterval is a suffix for deadlock detecor flag
SuffixDeadlockInterval = ".deadlockInterval"
// SuffixParallelism is a suffix for the parallelism flag
SuffixParallelism = ".parallelism"
// SuffixHTTPPort is a suffix for the HTTP port
Expand All @@ -58,6 +61,8 @@ const (
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultHTTPPort is the default HTTP port (e.g. for /metrics)
DefaultHTTPPort = 14271
// IngesterDefaultHealthCheckHTTPPort is the default HTTP Port for health check
Expand All @@ -71,6 +76,7 @@ type Options struct {
Encoding string
// IngesterHTTPPort is the port that the ingester service listens in on for http requests
IngesterHTTPPort int
DeadlockInterval time.Duration
}

// AddFlags adds flags for Builder
Expand Down Expand Up @@ -99,6 +105,10 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixHTTPPort,
DefaultHTTPPort,
"The http port for the ingester service")
flagSet.Duration(
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")

}

Expand All @@ -110,4 +120,6 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.Encoding = v.GetString(KafkaConfigPrefix + SuffixEncoding)
o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.IngesterHTTPPort = v.GetInt(ConfigPrefix + SuffixHTTPPort)

o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
}
4 changes: 4 additions & 0 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package app

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -31,13 +32,15 @@ func TestOptionsWithFlags(t *testing.T) {
"--kafka.group-id=group1",
"--kafka.encoding=json",
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
"--ingester.http-port=2345"})
o.InitFromViper(v)

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
assert.Equal(t, "group1", o.GroupID)
assert.Equal(t, 5, o.Parallelism)
assert.Equal(t, 2*time.Minute, o.DeadlockInterval)
assert.Equal(t, EncodingJSON, o.Encoding)
assert.Equal(t, 2345, o.IngesterHTTPPort)
}
Expand All @@ -53,4 +56,5 @@ func TestFlagDefaults(t *testing.T) {
assert.Equal(t, DefaultGroupID, o.GroupID)
assert.Equal(t, DefaultParallelism, o.Parallelism)
assert.Equal(t, DefaultEncoding, o.Encoding)
assert.Equal(t, DefaultDeadlockInterval, o.DeadlockInterval)
}

0 comments on commit b2bcba5

Please sign in to comment.