Skip to content

Commit

Permalink
fix: correct report of missing when multiple messages go missing
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 29, 2021
1 parent e55b950 commit f738044
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
11 changes: 8 additions & 3 deletions runner/sidecar/monitor/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partiti
}
lastOffset := i.db[key]
expectedOffset := lastOffset + 1
if offset < expectedOffset {
offsetDelta := offset - expectedOffset
if offsetDelta < 0 {
duplicateCounter.WithLabelValues(sourceName).Inc()
return false, nil
}
if offset > expectedOffset {
missingCounter.WithLabelValues(sourceName).Inc()
if offsetDelta > 0 {
missingCounter.WithLabelValues(sourceName).Add(float64(offsetDelta))
}
i.db[key] = offset
return true, nil
Expand All @@ -100,6 +101,10 @@ func (i *impl) commitOffsets(ctx context.Context) {
}
}

func (i *impl) Close(ctx context.Context) {
i.commitOffsets(ctx)
}

func New(ctx context.Context, pipelineName, stepName string) Interface {
i := &impl{
sync.Mutex{},
Expand Down
6 changes: 3 additions & 3 deletions runner/sidecar/monitor/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func Test_impl_Accept(t *testing.T) {
assert.Equal(t, 0, missing(t))
})
t.Run("SkippedOffset", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 4)
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 5)
assert.NoError(t, err)
assert.True(t, accept)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 1, missing(t))
assert.Equal(t, 2, missing(t))
})
thirtyDays := time.Hour * 24 * 30
rdb.On("Set", ctx, "my-pl/my-step/my-urn/1/offset", int64(1), thirtyDays).Return(nil)
rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(4), thirtyDays).Return(nil)
rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(5), thirtyDays).Return(nil)
t.Run("CommitOffsets", func(t *testing.T) {
i.commitOffsets(ctx)
})
Expand Down

0 comments on commit f738044

Please sign in to comment.