From f7380441a79db00d5969f5f81c7fa7c7db8a9c38 Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Wed, 29 Sep 2021 09:23:52 -0700 Subject: [PATCH] fix: correct report of `missing` when multiple messages go missing --- runner/sidecar/monitor/impl.go | 11 ++++++++--- runner/sidecar/monitor/impl_test.go | 6 +++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/runner/sidecar/monitor/impl.go b/runner/sidecar/monitor/impl.go index 7fde5fd1..b083029c 100644 --- a/runner/sidecar/monitor/impl.go +++ b/runner/sidecar/monitor/impl.go @@ -79,12 +79,13 @@ func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partiti } lastOffset := i.db[key] expectedOffset := lastOffset + 1 - if offset < expectedOffset { + offsetDelta := offset - expectedOffset + if offsetDelta < 0 { duplicateCounter.WithLabelValues(sourceName).Inc() return false, nil } - if offset > expectedOffset { - missingCounter.WithLabelValues(sourceName).Inc() + if offsetDelta > 0 { + missingCounter.WithLabelValues(sourceName).Add(float64(offsetDelta)) } i.db[key] = offset return true, nil @@ -100,6 +101,10 @@ func (i *impl) commitOffsets(ctx context.Context) { } } +func (i *impl) Close(ctx context.Context) { + i.commitOffsets(ctx) +} + func New(ctx context.Context, pipelineName, stepName string) Interface { i := &impl{ sync.Mutex{}, diff --git a/runner/sidecar/monitor/impl_test.go b/runner/sidecar/monitor/impl_test.go index 54cfa75b..4d0ab55c 100644 --- a/runner/sidecar/monitor/impl_test.go +++ b/runner/sidecar/monitor/impl_test.go @@ -47,15 +47,15 @@ func Test_impl_Accept(t *testing.T) { assert.Equal(t, 0, missing(t)) }) t.Run("SkippedOffset", func(t *testing.T) { - accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 4) + accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 5) assert.NoError(t, err) assert.True(t, accept) assert.Equal(t, 1, duplicate(t)) - assert.Equal(t, 1, missing(t)) + assert.Equal(t, 2, missing(t)) }) thirtyDays := time.Hour * 24 * 30 rdb.On("Set", ctx, "my-pl/my-step/my-urn/1/offset", int64(1), thirtyDays).Return(nil) - rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(4), thirtyDays).Return(nil) + rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(5), thirtyDays).Return(nil) t.Run("CommitOffsets", func(t *testing.T) { i.commitOffsets(ctx) })