Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Add timeout delivery metrics (#1578)
Browse files Browse the repository at this point in the history
* Add timeout delivery metrics

* address comments

* address comments
  • Loading branch information
grac3gao-zz authored Aug 14, 2020
1 parent 5d0515d commit 5247c40
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/broker/handler/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
}

expectMetrics.ExpectProcessing(t, t1.Name)
expectMetrics.ExpectTimeout(t, t1.Name)
expectMetrics.Expect200(t, t2.Name)
expectMetrics.Verify(t)
})
Expand Down
16 changes: 15 additions & 1 deletion pkg/broker/handler/processors/deliver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -149,15 +150,28 @@ func (p *Processor) deliver(ctx context.Context, target *config.Target, broker *
// Remove hops from forwarded event.
resp, err := p.sendMsg(ctx, target.Address, msg, transformer.DeleteExtension(eventutil.HopsAttribute))
if err != nil {
var result *url.Error
if errors.As(err, &result) && result.Timeout() {
// If the delivery is cancelled because of timeout, report event dispatch time without resp status code.
p.StatsReporter.ReportEventDispatchTime(ctx, time.Since(startTime))
}
return err
}

defer func() {
if err := resp.Body.Close(); err != nil {
logging.FromContext(ctx).Warn("failed to close response body", zap.Error(err))
}
}()

p.StatsReporter.ReportEventDispatchTime(ctx, time.Since(startTime), resp.StatusCode)
// Insert status code tag into context.
cctx, err := metrics.AddRespStatusCodeTags(ctx, resp.StatusCode)
if err != nil {
logging.FromContext(ctx).Error("failed to add status code tags to context", zap.Error(err))
}
// Report event dispatch time with resp status code.
p.StatsReporter.ReportEventDispatchTime(cctx, time.Since(startTime))

if resp.StatusCode/100 != 2 {
return fmt.Errorf("event delivery failed: HTTP status code %d", resp.StatusCode)
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/metrics/delivery_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (
"strconv"
"time"

"github.com/google/knative-gcp/pkg/broker/config"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"knative.dev/pkg/metrics"

"github.com/google/knative-gcp/pkg/broker/config"
)

type DeliveryMetricsKey int
Expand Down Expand Up @@ -121,14 +122,9 @@ func NewDeliveryReporter(podName PodName, containerName ContainerName) (*Deliver
}

// ReportEventDispatchTime captures dispatch times.
func (r *DeliveryReporter) ReportEventDispatchTime(ctx context.Context, d time.Duration, responseCode int) {
func (r *DeliveryReporter) ReportEventDispatchTime(ctx context.Context, d time.Duration) {
// convert time.Duration in nanoseconds to milliseconds.
metrics.Record(ctx, r.dispatchTimeInMsecM.M(float64(d/time.Millisecond)),
stats.WithTags(
tag.Insert(ResponseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(ResponseCodeClassKey, metrics.ResponseCodeClass(responseCode)),
),
)
metrics.Record(ctx, r.dispatchTimeInMsecM.M(float64(d/time.Millisecond)))
}

// StartEventProcessing records the start of event processing for delivery within the given context.
Expand Down Expand Up @@ -169,6 +165,13 @@ func (r *DeliveryReporter) AddTags(ctx context.Context) (context.Context, error)
)
}

func AddRespStatusCodeTags(ctx context.Context, responseCode int) (context.Context, error) {
return tag.New(ctx,
tag.Insert(ResponseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(ResponseCodeClassKey, metrics.ResponseCodeClass(responseCode)),
)
}

func AddTargetTags(ctx context.Context, target *config.Target) (context.Context, error) {
return tag.New(ctx,
tag.Insert(NamespaceNameKey, target.Namespace),
Expand Down
20 changes: 15 additions & 5 deletions pkg/metrics/delivery_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ func TestReportEventDispatchTime(t *testing.T) {
if err != nil {
t.Fatal(err)
}
cctx, _ := AddRespStatusCodeTags(ctx, 202)
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 1100*time.Millisecond, 202)
r.ReportEventDispatchTime(cctx, 1100*time.Millisecond)
return nil
})
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 9100*time.Millisecond, 202)
r.ReportEventDispatchTime(cctx, 9100*time.Millisecond)
return nil
})
metricstest.CheckCountData(t, "event_count", wantTags, 2)
Expand Down Expand Up @@ -120,8 +121,17 @@ func TestReportEventProcessingTime(t *testing.T) {
reportertest.ExpectMetrics(t, func() error {
return r.reportEventProcessingTime(ctx, startTime.Add(9100*time.Millisecond))
})

// Test report event dispatch time without status code.
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 1100*time.Millisecond)
return nil
})
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 9100*time.Millisecond)
return nil
})
metricstest.CheckDistributionData(t, "event_processing_latencies", wantTags, 2, 1100.0, 9100.0)
metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0)
}

func TestMetricsWithEmptySourceAndTypeFilter(t *testing.T) {
Expand Down Expand Up @@ -154,9 +164,9 @@ func TestMetricsWithEmptySourceAndTypeFilter(t *testing.T) {
if err != nil {
t.Fatal(err)
}

cctx, _ := AddRespStatusCodeTags(ctx, 202)
reportertest.ExpectMetrics(t, func() error {
r.ReportEventDispatchTime(ctx, 1100*time.Millisecond, 202)
r.ReportEventDispatchTime(cctx, 1100*time.Millisecond)
return nil
})
metricstest.CheckCountData(t, "event_count", wantTags, 1)
Expand Down
51 changes: 50 additions & 1 deletion pkg/metrics/testing/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ type ExpectDelivery struct {
TriggerTags map[string]Tags
ProcessingCount map[string]int64
DeliveryCount map[deliveryKey]int64
TimeoutCount map[string]int64
}

func NewExpectDelivery() ExpectDelivery {
return ExpectDelivery{
TriggerTags: make(map[string]Tags),
ProcessingCount: make(map[string]int64),
DeliveryCount: make(map[deliveryKey]int64),
TimeoutCount: make(map[string]int64),
}
}

Expand Down Expand Up @@ -76,6 +78,13 @@ func (e ExpectDelivery) ExpectDelivery(t *testing.T, trigger string, code int) {
e.DeliveryCount[key] = e.DeliveryCount[key] + 1
}

func (e ExpectDelivery) ExpectTimeout(t *testing.T, trigger string) {
if _, ok := e.TriggerTags[trigger]; !ok {
t.Fatalf("trigger %q not defined", trigger)
}
e.TimeoutCount[trigger] = e.TimeoutCount[trigger] + 1
}

func (e ExpectDelivery) Expect200(t *testing.T, trigger string) {
e.ExpectProcessing(t, trigger)
e.ExpectDelivery(t, trigger, 200)
Expand Down Expand Up @@ -109,6 +118,9 @@ func (e ExpectDelivery) attemptVerify() error {
if err := e.verifyDelivery("event_dispatch_latencies"); err != nil {
return err
}
if err := e.verifyTimeout(); err != nil {
return err
}
return nil
}

Expand All @@ -127,7 +139,10 @@ func (e ExpectDelivery) verifyDelivery(viewName string) error {
if !ok {
return fmt.Errorf("missing trigger_name tag for row: %v", row)
}
if code, err := strconv.Atoi(tags["response_code"]); err != nil {
if tags["response_code"] == "" {
// Skip time out record which doesn't have response code.
continue
} else if code, err := strconv.Atoi(tags["response_code"]); err != nil {
return fmt.Errorf("invalid response code in tags: %v", tags)
} else {
if got[deliveryKey{Trigger: trigger, Code: code}], err = getCount(row); err != nil {
Expand Down Expand Up @@ -178,6 +193,40 @@ func (e ExpectDelivery) verifyProcessing() error {
return nil
}

func (e ExpectDelivery) verifyTimeout() error {
rows, err := view.RetrieveData("event_dispatch_latencies")
if err != nil {
return err
}
got := make(map[string]int64)
for _, row := range rows {
tags := make(map[string]string)
for _, t := range row.Tags {
tags[t.Key.Name()] = t.Value
}
trigger, ok := tags["trigger_name"]
if !ok {
return fmt.Errorf("missing trigger_name tag for row: %v", row)
}

if tags["response_code"] != "" {
continue
}

if diff := cmp.Diff(e.TriggerTags[trigger], Tags(tags)); diff != "" {
return fmt.Errorf("unexpected tags (-want, +got) = %v", diff)
}
if got[trigger], err = getCount(row); err != nil {
return err
}
}

if diff := cmp.Diff(e.TimeoutCount, got); diff != "" {
return fmt.Errorf("unexpected timeout event_dispatch_latencies measurement count (-want, +got) = %v", diff)
}
return nil
}

func getCount(row *view.Row) (int64, error) {
switch data := row.Data.(type) {
case *view.CountData:
Expand Down

0 comments on commit 5247c40

Please sign in to comment.