Skip to content

Commit

Permalink
fix: enhance missing/duplicate monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 30, 2021
1 parent 14d0286 commit 2ac7baf
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 20 deletions.
13 changes: 5 additions & 8 deletions runner/sidecar/monitor/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,16 @@ type impl struct {
storage storage
}

func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) (bool, error) {
func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) bool {
i.mu.Lock()
defer i.mu.Unlock()
key := fmt.Sprintf("%s/%s/%s/%d/offset", i.pipelineName, i.stepName, sourceURN, partition)
if _, ok := i.db[key]; !ok {
text, _ := i.storage.Get(ctx, key)
if text == "" { // assume that this is the first time, and we are continuous
lastOffset, err := strconv.ParseInt(text, 10, 64)
if err != nil {
i.db[key] = offset - 1
} else {
lastOffset, err := strconv.ParseInt(text, 10, 64)
if err != nil {
return false, err
}
i.db[key] = lastOffset
}
}
Expand All @@ -82,13 +79,13 @@ func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partiti
offsetDelta := offset - expectedOffset
if offsetDelta < 0 {
duplicateCounter.WithLabelValues(sourceName).Inc()
return false, nil
return false
}
if offsetDelta > 0 {
missingCounter.WithLabelValues(sourceName).Add(float64(offsetDelta))
}
i.db[key] = offset
return true, nil
return true
}

func (i *impl) commitOffsets(ctx context.Context) {
Expand Down
12 changes: 4 additions & 8 deletions runner/sidecar/monitor/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,25 @@ func Test_impl_Accept(t *testing.T) {
storage: rdb,
}
t.Run("EmptyStorage", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 1, 1)
assert.NoError(t, err)
accept := i.Accept(ctx, "my-source", "my-urn", 1, 1)
assert.True(t, accept)
assert.Equal(t, 0, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("ExistingStorage", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 2)
assert.NoError(t, err)
accept := i.Accept(ctx, "my-source", "my-urn", 2, 2)
assert.True(t, accept)
assert.Equal(t, 0, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("RepeatedOffset", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 2)
assert.NoError(t, err)
accept := i.Accept(ctx, "my-source", "my-urn", 2, 2)
assert.False(t, accept)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("SkippedOffset", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 5)
assert.NoError(t, err)
accept := i.Accept(ctx, "my-source", "my-urn", 2, 5)
assert.True(t, accept)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 2, missing(t))
Expand Down
4 changes: 3 additions & 1 deletion runner/sidecar/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ import (
)

type Interface interface {
Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) (bool, error)
// Accept determine if the message should be processed. It is not a duplicate.
Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) bool
Close(context.Context)
}
4 changes: 1 addition & 3 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
logger.Info("starting consuming claim", "partition", claim.Partition())
defer sess.Commit()
for msg := range claim.Messages() {
if ok, err := h.mntr.Accept(ctx, h.sourceName, h.sourceURN, msg.Partition, msg.Offset); err != nil {
logger.Error(err, "failed to determine if we should accept the message")
} else if !ok {
if !h.mntr.Accept(ctx, h.sourceName, h.sourceURN, msg.Partition, msg.Offset) {
continue
}
if err := h.processMessage(ctx, msg); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
}

mntr := monitor.New(ctx, pipelineName, stepName)
addStopHook(func(ctx context.Context) error {
logger.Info("closing monitor")
mntr.Close(ctx)
logger.Info("monitor closed")
return nil
})
sources := make(map[string]source.Interface)
for _, s := range step.Spec.Sources {
sourceName := s.Name
Expand Down

0 comments on commit 2ac7baf

Please sign in to comment.