Skip to content

Commit

Permalink
fix: total/error calcs
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 4, 2021
1 parent 06fc5f0 commit 421c142
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
13 changes: 13 additions & 0 deletions api/v1alpha1/source_status.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package v1alpha1

import "strconv"

type SourceStatus struct {
LastMessage *Message `json:"lastMessage,omitempty" protobuf:"bytes,2,opt,name=lastMessage"`
LastError *Error `json:"lastError,omitempty" protobuf:"bytes,5,opt,name=lastError"`
Pending *uint64 `json:"pending,omitempty" protobuf:"varint,3,opt,name=pending"`
Metrics map[string]Metrics `json:"metrics,omitempty" protobuf:"bytes,4,rep,name=metrics"`
}

func (m SourceStatus) GetMetrics(replica int) Metrics {
if x, ok := m.Metrics[strconv.Itoa(replica)]; ok {
return x
}
return Metrics{}
}

func (m SourceStatus) GetPending() uint64 {
if m.Pending != nil {
return *m.Pending
Expand All @@ -30,6 +39,10 @@ func (in SourceStatus) GetErrors() uint64 {
return x
}

func (in SourceStatus) AnySunk() bool {
return in.GetTotal() > 0
}

func (in SourceStatus) AnyErrors() bool {
return in.GetErrors() > 0
}
16 changes: 10 additions & 6 deletions api/v1alpha1/source_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ func (in SourceStatuses) Set(name string, replica int, msg string, rate resource
in[name] = x
}

func (in SourceStatuses) GetTotal() uint64 {
var x uint64
for _, s := range in {
x += s.GetTotal()
func (in SourceStatuses) Get(name string) SourceStatus {
if x, ok := in[name]; ok {
return x
}
return x
return SourceStatus{}
}

func (in SourceStatuses) IncErrors(name string, replica int, err error) {
Expand Down Expand Up @@ -70,5 +69,10 @@ func (in SourceStatuses) AnyErrors() bool {
}

func (in SourceStatuses) AnySunk() bool {
return in.GetTotal() > 0
for _, s := range in {
if s.AnySunk() {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,27 +292,27 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
rateCounter := ratecounter.NewRateCounter(updateInterval)

labels := map[string]string{"sourceName": source.Name, "replica": strconv.Itoa(replica)}
if replica == 0 {
if replica == 0 { // only replica zero updates this value, so it the only replica that can be accurate
promauto.NewCounterFunc(prometheus.CounterOpts{
Name: "pending",
Subsystem: "sources",
Help: "Pending messages",
ConstLabels: labels,
}, func() float64 { return float64(status.SourceStatuses[sourceName].GetPending()) })
}, func() float64 { return float64(status.SourceStatuses.Get(sourceName).GetPending()) })
}
promauto.NewCounterFunc(prometheus.CounterOpts{
Name: "total",
Subsystem: "sources",
Help: "Total number of messages",
ConstLabels: labels,
}, func() float64 { return float64(status.SourceStatuses[sourceName].GetTotal()) })
}, func() float64 { return float64(status.SourceStatuses.Get(sourceName).GetMetrics(replica).Total) })

promauto.NewCounterFunc(prometheus.CounterOpts{
Subsystem: "sources",
Name: "errors",
Help: "Total number of errors",
ConstLabels: labels,
}, func() float64 { return float64(status.SourceStatuses[sourceName].GetErrors()) })
}, func() float64 { return float64(status.SourceStatuses.Get(sourceName).GetMetrics(replica).Errors) })

f := func(msg []byte) error {
rateCounter.Incr(1)
Expand Down

0 comments on commit 421c142

Please sign in to comment.