Skip to content

Commit

Permalink
fix: update sidecar locking to try and avoid sawing metric values. Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Aug 10, 2021
1 parent 9f98d90 commit f5b7751
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 11 deletions.
2 changes: 1 addition & 1 deletion runner/sidecar/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package sidecar

import "sync"

var mu = sync.Mutex{}
var mu = sync.RWMutex{} // mutex to guard step

func withLock(f func()) {
mu.Lock()
Expand Down
5 changes: 4 additions & 1 deletion runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ func patchStepStatus() {
}
}
}
if notEqual, patch := sharedutil.NotEqual(dfv1.Step{Status: lastStep.Status}, dfv1.Step{Status: step.Status}); notEqual {
mu.RLock()
notEqual, patch := sharedutil.NotEqual(dfv1.Step{Status: lastStep.Status}, dfv1.Step{Status: step.Status})
mu.RUnlock()
if notEqual {
logger.Info("patching step status", "patch", patch)
if un, err := dynamicInterface.
Resource(dfv1.StepGroupVersionResource).
Expand Down
26 changes: 20 additions & 6 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,25 +127,39 @@ func newSourceMetrics(source dfv1.Source, sourceName string) {
Name: "pending",
Help: "Pending messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_pending",
ConstLabels: map[string]string{"sourceName": source.Name},
}, func() float64 { return float64(step.Status.SourceStatuses.Get(sourceName).GetPending()) })
}, func() float64 {
mu.Lock()
defer mu.Unlock()
return float64(step.Status.SourceStatuses.Get(sourceName).GetPending())
})
promauto.NewCounterFunc(prometheus.CounterOpts{
Subsystem: "sources",
Name: "total",
Help: "Total number of messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_total",
ConstLabels: map[string]string{"sourceName": source.Name},
}, func() float64 { return float64(step.Status.SourceStatuses.Get(sourceName).GetTotal()) })

}, func() float64 {
mu.RLock()
defer mu.RUnlock()
return float64(step.Status.SourceStatuses.Get(sourceName).GetTotal())
})
promauto.NewCounterFunc(prometheus.CounterOpts{
Subsystem: "sources",
Name: "errors",
Help: "Total number of errors, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_errors",
ConstLabels: map[string]string{"sourceName": source.Name},
}, func() float64 { return float64(step.Status.SourceStatuses.Get(sourceName).GetErrors()) })

}, func() float64 {
mu.RLock()
defer mu.RUnlock()
return float64(step.Status.SourceStatuses.Get(sourceName).GetErrors())
})
promauto.NewCounterFunc(prometheus.CounterOpts{
Subsystem: "sources",
Name: "retries",
Help: "Number of retries, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_retries",
ConstLabels: map[string]string{"sourceName": source.Name},
}, func() float64 { return float64(step.Status.SourceStatuses.Get(sourceName).GetRetries()) })
}, func() float64 {
mu.RLock()
defer mu.RUnlock()
return float64(step.Status.SourceStatuses.Get(sourceName).GetRetries())
})
}
2 changes: 1 addition & 1 deletion test/http-stress/test-results.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"TestHTTPSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 700,
"TestHTTPSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 650,
"TestHTTPSinkStress/currentContext=docker-desktop,replicas=2,n=10000.tps": 400,
"TestHTTPSinkStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 950,
"TestHTTPSinkStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 650,
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"TestKafkaSinkStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 550,
"TestKafkaSinkStress/replicas=1.tps": 100,
"TestKafkaSinkStress/replicas=2.tps": 250,
"TestKafkaSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1100,
"TestKafkaSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1000,
"TestKafkaSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 1700,
"TestKafkaSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 1550,
"TestKafkaSourceStress/replicas=1.tps": 100,
Expand Down
2 changes: 1 addition & 1 deletion test/stan-stress/test-results.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"TestStanSinkStress/replicas=3.tps": 300,
"TestStanSinkStress/replicas=4.tps": 300,
"TestStanSinkStress/replicas=5.tps": 350,
"TestStanSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1150,
"TestStanSourceStress/currentContext=docker-desktop,replicas=1,n=10000.tps": 1100,
"TestStanSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=1,n=10000.tps": 1500,
"TestStanSourceStress/currentContext=gke_jesse-sb_us-west2_cluster-2,replicas=2,n=10000.tps": 2850,
"TestStanSourceStress/replicas=1.tps": 550,
Expand Down

0 comments on commit f5b7751

Please sign in to comment.