Skip to content

Commit

Permalink
fix: short-term fix for pending metric not being updated (#346)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Sep 18, 2021
1 parent 7425aa5 commit 657062a
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
1 change: 0 additions & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var (
ready = false // we are ready to serve HTTP requests, also updates pod status condition
kubernetesInterface kubernetes.Interface
secretInterface corev1.SecretInterface
prePatchHooks []func(ctx context.Context) error // hooks to run before patching
replica int
step dfv1.Step // this is updated on start, and then periodically as we update the status
stepName string
Expand Down
10 changes: 5 additions & 5 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

func connectSources(ctx context.Context, process func(context.Context, []byte) error) error {
Expand Down Expand Up @@ -158,17 +159,16 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
return sources[sourceName].Close()
})
if x, ok := sources[sourceName].(source.HasPending); ok && leadReplica() {
logger.Info("adding pre-patch hook", "source", sourceName)
prePatchHooks = append(prePatchHooks, func(ctx context.Context) error {
logger.Info("starting pending loop", "source", sourceName, "updateInterval", updateInterval.String())
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
logger.Info("getting pending", "source", sourceName)
if pending, err := x.GetPending(ctx); err != nil {
return err
logger.Error(err, "failed to get pending", "source", sourceName)
} else {
logger.Info("got pending", "source", sourceName, "pending", pending)
pendingGauge.WithLabelValues(sourceName).Set(float64(pending))
}
return nil
})
}, updateInterval, 1.2, true)
}
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions test/kafka-stress/kafka_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestKafkaSourceStress(t *testing.T) {
defer StartTPSReporter(t, "main", prefix, n)()

go PumpKafkaTopic(topic, n, prefix, Params.MessageSize)
WaitForPending()
WaitForTotalSunkMessages(n, Params.Timeout)
}

Expand Down Expand Up @@ -97,5 +98,6 @@ func TestKafkaSinkStress(t *testing.T) {
defer StartTPSReporter(t, "main", prefix, n)()

go PumpKafkaTopic(topic, n, prefix, Params.MessageSize)
WaitForPending()
WaitForTotalSunkMessages(n, Params.Timeout)
}
4 changes: 2 additions & 2 deletions test/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/prometheus/common/expfmt"
)

func WaitForNothingPending() {
ExpectMetric("pending", Eq(0))
func WaitForPending() {
ExpectMetric("pending", Gt(0))
}

func WaitForTotalSourceMessages(v int) {
Expand Down
2 changes: 2 additions & 0 deletions test/stan-stress/stan_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestStanSourceStress(t *testing.T) {
defer StartTPSReporter(t, "main", prefix, n)()

go PumpSTANSubject(longSubject, n, prefix, Params.MessageSize)
WaitForPending()
WaitForTotalSunkMessages(n, Params.Timeout)
}

Expand Down Expand Up @@ -91,5 +92,6 @@ func TestStanSinkStress(t *testing.T) {
defer StartTPSReporter(t, "main", prefix, n)()

go PumpSTANSubject(longSubject, n, prefix, Params.MessageSize)
WaitForPending()
WaitForTotalSunkMessages(n, Params.Timeout)
}

0 comments on commit 657062a

Please sign in to comment.