diff --git a/metrics_batcher.go b/metrics_batcher.go index ccd6ee4..628471e 100644 --- a/metrics_batcher.go +++ b/metrics_batcher.go @@ -17,6 +17,8 @@ package stackdriver import ( "context" "fmt" + "regexp" + "strconv" "strings" "sync" "time" @@ -131,16 +133,45 @@ func (mb *metricsBatcher) sendReqToChan() { mb.reqsChan <- req } +// regex to extract min-max ranges from error response strings in the format "timeSeries[(min-max,...)] ..." (max is optional) +var timeSeriesErrRegex = regexp.MustCompile(`: timeSeries\[([0-9]+(?:-[0-9]+)?(?:,[0-9]+(?:-[0-9]+)?)*)\]`) + // sendReq sends create time series requests to Stackdriver, // and returns the count of dropped time series and error. func sendReq(ctx context.Context, c *monitoring.MetricClient, req *monitoringpb.CreateTimeSeriesRequest) (int, error) { - if c != nil { // c==nil only happens in unit tests where we don't make real calls to Stackdriver server - err := createTimeSeries(ctx, c, req) - if err != nil { - return len(req.TimeSeries), err + // c == nil only happens in unit tests where we don't make real calls to Stackdriver server + if c == nil { + return 0, nil + } + + err := createTimeSeries(ctx, c, req) + if err == nil { + return 0, nil + } + + droppedTimeSeriesRangeMatches := timeSeriesErrRegex.FindAllStringSubmatch(err.Error(), -1) + if !strings.HasPrefix(err.Error(), "One or more TimeSeries could not be written:") || len(droppedTimeSeriesRangeMatches) == 0 { + return len(req.TimeSeries), err + } + + dropped := 0 + for _, submatches := range droppedTimeSeriesRangeMatches { + for i := 1; i < len(submatches); i++ { + for _, rng := range strings.Split(submatches[i], ",") { + rngSlice := strings.Split(rng, "-") + + // strconv errors not possible due to regex above + min, _ := strconv.Atoi(rngSlice[0]) + max := min + if len(rngSlice) > 1 { + max, _ = strconv.Atoi(rngSlice[1]) + } + + dropped += max - min + 1 + } } } - return 0, nil + return dropped, err } type worker struct { diff --git a/metrics_batcher_test.go b/metrics_batcher_test.go index b7cc4cf..e8fdc20 100644 --- a/metrics_batcher_test.go +++ b/metrics_batcher_test.go @@ -16,6 +16,7 @@ package stackdriver import ( "context" + "errors" "fmt" "testing" @@ -109,3 +110,73 @@ func makeTs(i int) *monitoringpb.TimeSeries { }, } } + +func TestSendReqAndParseDropped(t *testing.T) { + type testCase struct { + name string + timeseriesCount int + createTimeSeriesFunc func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error + expectedErr bool + expectedDropped int + } + + testCases := []testCase{ + { + name: "No error", + timeseriesCount: 75, + createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error { + return nil + }, + expectedErr: false, + expectedDropped: 0, + }, + { + name: "Partial error", + timeseriesCount: 75, + createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error { + return errors.New("One or more TimeSeries could not be written: Internal error encountered. Please retry after a few seconds. If internal errors persist, contact support at https://cloud.google.com/support/docs.: timeSeries[0-16,25-44,46-74]; Unknown metric: agent.googleapis.com/system.swap.page_faults: timeSeries[45]") + }, + expectedErr: true, + expectedDropped: 67, + }, + { + name: "Incorrectly formatted error", + timeseriesCount: 75, + createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error { + return errors.New("One or more TimeSeries could not be written: Internal error encountered. Please retry after a few seconds. If internal errors persist, contact support at https://cloud.google.com/support/docs.: timeSeries[0-16,25-44,,46-74]; Unknown metric: agent.googleapis.com/system.swap.page_faults: timeSeries[45x]") + }, + expectedErr: true, + expectedDropped: 75, + }, + { + name: "Unexpected error format", + timeseriesCount: 75, + createTimeSeriesFunc: func(ctx context.Context, c *monitoring.MetricClient, ts *monitoringpb.CreateTimeSeriesRequest) error { + return errors.New("err1") + }, + expectedErr: true, + expectedDropped: 75, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + persistedCreateTimeSeries := createTimeSeries + createTimeSeries = test.createTimeSeriesFunc + + mc, _ := monitoring.NewMetricClient(context.Background()) + d, err := sendReq(context.Background(), mc, &monitoringpb.CreateTimeSeriesRequest{TimeSeries: make([]*monitoringpb.TimeSeries, test.timeseriesCount)}) + if !test.expectedErr && err != nil { + t.Fatal("Expected no err") + } + if test.expectedErr && err == nil { + t.Fatal("Expected noerr") + } + if d != test.expectedDropped { + t.Fatalf("Want %v dropped, got %v", test.expectedDropped, d) + } + + createTimeSeries = persistedCreateTimeSeries + }) + } +}