diff --git a/cmd/pipecd/BUILD.bazel b/cmd/pipecd/BUILD.bazel index 2c8067bd87..d5a28095fe 100644 --- a/cmd/pipecd/BUILD.bazel +++ b/cmd/pipecd/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/app/api/service/webservice:go_default_library", "//pkg/app/api/stagelogstore:go_default_library", "//pkg/app/ops/handler:go_default_library", + "//pkg/app/ops/insightcollector:go_default_library", "//pkg/cache/rediscache:go_default_library", "//pkg/cli:go_default_library", "//pkg/config:go_default_library", @@ -38,6 +39,7 @@ go_library( "//pkg/rpc:go_default_library", "//pkg/version:go_default_library", "@com_github_dgrijalva_jwt_go//:go_default_library", + "@com_github_robfig_cron_v3//:go_default_library", "@com_github_nytimes_gziphandler//:go_default_library", "@com_github_spf13_cobra//:go_default_library", "@org_golang_x_sync//errgroup:go_default_library", diff --git a/cmd/pipecd/ops.go b/cmd/pipecd/ops.go index 2fb5d57bc7..fb59e6440d 100644 --- a/cmd/pipecd/ops.go +++ b/cmd/pipecd/ops.go @@ -19,22 +19,25 @@ import ( "net/http" "time" + "github.com/robfig/cron/v3" "github.com/spf13/cobra" "go.uber.org/zap" "golang.org/x/sync/errgroup" "github.com/pipe-cd/pipe/pkg/admin" "github.com/pipe-cd/pipe/pkg/app/ops/handler" + "github.com/pipe-cd/pipe/pkg/app/ops/insightcollector" "github.com/pipe-cd/pipe/pkg/cli" "github.com/pipe-cd/pipe/pkg/datastore" "github.com/pipe-cd/pipe/pkg/version" ) type ops struct { - httpPort int - adminPort int - gracePeriod time.Duration - configFile string + httpPort int + adminPort int + gracePeriod time.Duration + enableInsightCollector bool + configFile string } func NewOpsCommand() *cobra.Command { @@ -51,7 +54,7 @@ func NewOpsCommand() *cobra.Command { cmd.Flags().IntVar(&s.httpPort, "http-port", s.httpPort, "The port number used to run http server.") cmd.Flags().IntVar(&s.adminPort, "admin-port", s.adminPort, "The port number used to run a HTTP server for admin tasks such as metrics, healthz.") cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.") - + cmd.Flags().BoolVar(&s.enableInsightCollector, "enableInsightCollector-insight-collector", s.enableInsightCollector, "Enable insight collector.") cmd.Flags().StringVar(&s.configFile, "config-file", s.configFile, "The path to the configuration file.") return cmd } @@ -82,6 +85,41 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error { } }() + fs, err := createFilestore(ctx, cfg, t.Logger) + if err != nil { + t.Logger.Error("failed to create filestore", zap.Error(err)) + return err + } + defer func() { + if err := fs.Close(); err != nil { + t.Logger.Error("failed to close filestore client", zap.Error(err)) + } + }() + + // Starting a cron job for insight collector. + if s.enableInsightCollector { + collector := insightcollector.NewInsightCollector(ds, fs, t.Logger) + c := cron.New(cron.WithLocation(time.UTC)) + _, err := c.AddFunc(cfg.InsightCollector.Schedule, func() { + start := time.Now() + if err := collector.CollectProjectsInsight(ctx); err != nil { + t.Logger.Error("failed to run the project insight collector", zap.Error(err)) + } else { + t.Logger.Info("project insight collector successfully finished", zap.Duration("duration", time.Since(start))) + } + + start = time.Now() + if err := collector.CollectApplicationInsight(ctx); err != nil { + t.Logger.Error("failed to run the application insight collector", zap.Error(err)) + } else { + t.Logger.Info("application insight collector successfully finished", zap.Duration("duration", time.Since(start))) + } + }) + if err != nil { + t.Logger.Error("failed to configure the insight collector", zap.Error(err)) + } + } + // Start running HTTP server. { handler := handler.NewHandler(s.httpPort, datastore.NewProjectStore(ds), cfg.SharedSSOConfigs, s.gracePeriod, t.Logger) diff --git a/go.mod b/go.mod index 5ae99cecd3..7d34b4a09c 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/prometheus/client_golang v1.6.0 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/spf13/cobra v0.0.5 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.5.1 diff --git a/go.sum b/go.sum index f7f128bdc9..b438e90a8b 100644 --- a/go.sum +++ b/go.sum @@ -395,6 +395,9 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI= github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/pkg/app/ops/insightcollector/BUILD.bazel b/pkg/app/ops/insightcollector/BUILD.bazel index 924dc0d289..9433b4ab57 100644 --- a/pkg/app/ops/insightcollector/BUILD.bazel +++ b/pkg/app/ops/insightcollector/BUILD.bazel @@ -1,8 +1,32 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = ["collector.go"], importpath = "github.com/pipe-cd/pipe/pkg/app/ops/insightcollector", visibility = ["//visibility:public"], + deps = [ + "//pkg/datastore:go_default_library", + "//pkg/insightstore:go_default_library", + "//pkg/model:go_default_library", + "//pkg/filestore:go_default_library", + "@org_uber_go_zap//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + size = "small", + srcs = [ + "collector_test.go" + ], + embed = [":go_default_library"], + deps = [ + "//pkg/insightstore:go_default_library", + "//pkg/filestore/filestoretest:go_default_library", + "//pkg/datastore:go_default_library", + "//pkg/datastore/datastoretest:go_default_library", + "@com_github_golang_mock//gomock:go_default_library", + "@com_github_stretchr_testify//assert:go_default_library", + ], ) diff --git a/pkg/app/ops/insightcollector/collector.go b/pkg/app/ops/insightcollector/collector.go index 6f25ce8c75..26324449d1 100644 --- a/pkg/app/ops/insightcollector/collector.go +++ b/pkg/app/ops/insightcollector/collector.go @@ -13,3 +13,440 @@ // limitations under the License. package insightcollector + +import ( + "context" + "errors" + "fmt" + "time" + + "go.uber.org/zap" + + "github.com/pipe-cd/pipe/pkg/datastore" + "github.com/pipe-cd/pipe/pkg/filestore" + "github.com/pipe-cd/pipe/pkg/insightstore" + "github.com/pipe-cd/pipe/pkg/model" +) + +var aggregateKinds = []model.InsightMetricsKind{ + model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, + model.InsightMetricsKind_CHANGE_FAILURE_RATE, +} + +// InsightCollector implements the behaviors for the gRPC definitions of InsightCollector. +type InsightCollector struct { + projectStore datastore.ProjectStore + applicationStore datastore.ApplicationStore + deploymentStore datastore.DeploymentStore + insightstore insightstore.Store + logger *zap.Logger +} + +// NewInsightCollector creates a new InsightCollector instance. +func NewInsightCollector( + ds datastore.DataStore, + fs filestore.Store, + logger *zap.Logger, +) *InsightCollector { + a := &InsightCollector{ + projectStore: datastore.NewProjectStore(ds), + applicationStore: datastore.NewApplicationStore(ds), + deploymentStore: datastore.NewDeploymentStore(ds), + insightstore: insightstore.NewStore(fs), + logger: logger.Named("insight-collector"), + } + return a +} + +var ( + pageSize = 50 +) + +func (i *InsightCollector) CollectProjectsInsight(ctx context.Context) error { + now := time.Now().UTC() + maxCreatedAt := now.Unix() + + for { + projects, err := i.projectStore.ListProjects(ctx, datastore.ListOptions{ + PageSize: pageSize, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: "<", + Value: maxCreatedAt, + }, + }, + Orders: []datastore.Order{ + { + Field: "CreatedAt", + Direction: datastore.Desc, + }, + }, + }) + if err != nil { + return err + } + if len(projects) == 0 { + // updated all project's insights completely + break + } + + for _, p := range projects { + for _, k := range aggregateKinds { + if err := i.updateApplicationChunks(ctx, p.Id, "", k, now); err != nil { + i.logger.Error("failed to update application chunks", zap.Error(err)) + } + } + } + maxCreatedAt = projects[len(projects)-1].CreatedAt + } + return nil +} + +func (i *InsightCollector) CollectApplicationInsight(ctx context.Context) error { + now := time.Now().UTC() + maxCreatedAt := now.Unix() + + for { + apps, err := i.applicationStore.ListApplications(ctx, datastore.ListOptions{ + PageSize: pageSize, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: "<", + Value: maxCreatedAt, + }, + }, + Orders: []datastore.Order{ + { + Field: "CreatedAt", + Direction: datastore.Desc, + }, + }, + }) + if err != nil { + return err + } + if len(apps) == 0 { + // updated all application's insights completely + break + } + + for _, app := range apps { + if app.Deleted { + continue + } + for _, k := range aggregateKinds { + + if err := i.updateApplicationChunks(ctx, app.ProjectId, app.Id, k, now); err != nil { + i.logger.Error("failed to update application chunks", zap.Error(err)) + } + } + } + maxCreatedAt = apps[len(apps)-1].CreatedAt + } + return nil +} + +func (i *InsightCollector) updateApplicationChunks( + ctx context.Context, + projectID, appID string, + kind model.InsightMetricsKind, + to time.Time, +) error { + chunkFiles, err := i.insightstore.LoadChunks(ctx, projectID, appID, kind, model.InsightStep_MONTHLY, to, 1) + var chunk insightstore.Chunk + if err == filestore.ErrNotFound { + chunk = insightstore.NewChunk(projectID, kind, model.InsightStep_MONTHLY, appID, to) + } else if err != nil { + return err + } else { + chunk = chunkFiles[0] + } + + yearsFiles, err := i.insightstore.LoadChunks(ctx, projectID, appID, kind, model.InsightStep_YEARLY, to, 1) + var years insightstore.Chunk + if err == filestore.ErrNotFound { + years = insightstore.NewChunk(projectID, kind, model.InsightStep_YEARLY, appID, to) + } else if err != nil { + return err + } else { + years = yearsFiles[0] + } + + chunk, years, err = i.updateChunk(ctx, chunk, years, projectID, appID, kind, to) + if err != nil { + return err + } + + err = i.insightstore.PutChunk(ctx, chunk) + if err != nil { + return err + } + + err = i.insightstore.PutChunk(ctx, years) + if err != nil { + return err + } + + return nil +} + +func (i *InsightCollector) updateChunk( + ctx context.Context, + chunk, years insightstore.Chunk, + projectID, appID string, + kind model.InsightMetricsKind, + to time.Time, +) (insightstore.Chunk, insightstore.Chunk, error) { + accumulatedTo := time.Unix(chunk.GetAccumulatedTo(), 0).UTC() + yearsAccumulatedTo := time.Unix(years.GetAccumulatedTo(), 0).UTC() + + updatedps, accumulateTo, err := i.getDailyInsightData(ctx, projectID, appID, kind, accumulatedTo, to) + if err != nil { + return nil, nil, err + } + + updatedpsForYears, yearAccumulateTo, err := i.getDailyInsightData(ctx, projectID, appID, kind, yearsAccumulatedTo, to) + if err != nil { + return nil, nil, err + } + + for _, s := range model.InsightStep_value { + step := model.InsightStep(s) + if step == model.InsightStep_YEARLY { + chunk, err = i.updateDataPoints(years, step, updatedpsForYears, yearAccumulateTo) + } else { + chunk, err = i.updateDataPoints(chunk, step, updatedps, accumulateTo) + } + if err != nil { + return nil, nil, err + } + } + return chunk, years, nil +} + +func (i *InsightCollector) updateDataPoints(chunk insightstore.Chunk, step model.InsightStep, updatedps []insightstore.DataPoint, accumulatedTo int64) (insightstore.Chunk, error) { + dps, err := chunk.GetDataPoints(step) + if err != nil { + return nil, err + } + + for _, d := range updatedps { + key := insightstore.NormalizeTime(time.Unix(d.GetTimestamp(), 0).UTC(), step) + + dps, err = insightstore.UpdateDataPoint(dps, d, key.Unix()) + if err != nil { + return nil, err + } + } + chunk.SetAccumulatedTo(accumulatedTo) + err = chunk.SetDataPoints(step, dps) + if err != nil { + return nil, err + } + + return chunk, nil +} + +func (i *InsightCollector) getDailyInsightData( + ctx context.Context, + projectID, appID string, + kind model.InsightMetricsKind, + rangeFrom time.Time, + rangeTo time.Time, +) ([]insightstore.DataPoint, int64, error) { + step := model.InsightStep_DAILY + + var movePoint func(time.Time, int) time.Time + movePoint = func(from time.Time, i int) time.Time { + from = insightstore.NormalizeTime(from, step) + return from.AddDate(0, 0, i) + } + + var updatedps []insightstore.DataPoint + + to := movePoint(rangeFrom, 1) + until := movePoint(rangeTo, 1) + var accumulatedTo time.Time + for { + targetTimestamp := insightstore.NormalizeTime(rangeFrom, step).Unix() + + var data insightstore.DataPoint + var a time.Time + var err error + switch kind { + case model.InsightMetricsKind_DEPLOYMENT_FREQUENCY: + data, a, err = i.getInsightDataForDeployFrequency(ctx, projectID, appID, targetTimestamp, rangeFrom, to) + case model.InsightMetricsKind_CHANGE_FAILURE_RATE: + data, a, err = i.getInsightDataForChangeFailureRate(ctx, projectID, appID, targetTimestamp, rangeFrom, to) + default: + return nil, 0, fmt.Errorf("invalid step: %v", kind) + } + if err != nil { + if err == ErrDeploymentNotFound { + if to.Equal(until) { + break + } + rangeFrom = to + to = movePoint(to, 1) + continue + } + return nil, 0, err + } + + updatedps = append(updatedps, data) + rangeFrom = a + accumulatedTo = a + } + + return updatedps, accumulatedTo.Unix(), nil +} + +var ( + ErrDeploymentNotFound = errors.New("deployments not found") +) + +// getInsightDataForDeployFrequency accumulate insight data in target range for deploy frequency. +func (i *InsightCollector) getInsightDataForDeployFrequency( + ctx context.Context, + projectID, applicationID string, + targetTimestamp int64, + from time.Time, + to time.Time) (*insightstore.DeployFrequency, time.Time, error) { + filters := []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: from.Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: to.Unix(), + }, + } + + if applicationID != "" { + filters = append(filters, datastore.ListFilter{ + Field: "ApplicationId", + Operator: "==", + Value: applicationID, + }) + } + + if projectID != "" { + filters = append(filters, datastore.ListFilter{ + Field: "ProjectId", + Operator: "==", + Value: projectID, + }) + } + + deployments, err := i.deploymentStore.ListDeployments(ctx, datastore.ListOptions{ + PageSize: pageSize, + Filters: filters, + }) + if err != nil { + i.logger.Error("failed to get deployments", zap.Error(err)) + return &insightstore.DeployFrequency{}, time.Time{}, fmt.Errorf("failed to get deployments") + } + if len(deployments) == 0 { + return &insightstore.DeployFrequency{}, time.Time{}, ErrDeploymentNotFound + } + + accumulatedTo := from.Unix() + for _, d := range deployments { + if d.CreatedAt > accumulatedTo { + accumulatedTo = d.CreatedAt + } + } + + return &insightstore.DeployFrequency{ + Timestamp: targetTimestamp, + DeployCount: float32(len(deployments)), + }, time.Unix(accumulatedTo, 0).UTC(), nil +} + +// getInsightDataForChangeFailureRate accumulate insight data in target range for change failure rate +func (i *InsightCollector) getInsightDataForChangeFailureRate( + ctx context.Context, + projectID, applicationID string, + targetTimestamp int64, + from time.Time, + to time.Time) (*insightstore.ChangeFailureRate, time.Time, error) { + + filters := []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: from.Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: to.Unix(), + }, + } + + if applicationID != "" { + filters = append(filters, datastore.ListFilter{ + Field: "ApplicationId", + Operator: "==", + Value: applicationID, + }) + } + + if projectID != "" { + filters = append(filters, datastore.ListFilter{ + Field: "ProjectId", + Operator: "==", + Value: projectID, + }) + } + + deployments, err := i.deploymentStore.ListDeployments(ctx, datastore.ListOptions{ + PageSize: pageSize, + Filters: filters, + }) + if err != nil { + i.logger.Error("failed to get deployments", zap.Error(err)) + return &insightstore.ChangeFailureRate{}, time.Time{}, fmt.Errorf("failed to get deployments") + } + + if len(deployments) == 0 { + return &insightstore.ChangeFailureRate{}, time.Time{}, ErrDeploymentNotFound + } + + var successCount int64 + var failureCount int64 + for _, d := range deployments { + switch d.Status { + case model.DeploymentStatus_DEPLOYMENT_SUCCESS: + successCount++ + case model.DeploymentStatus_DEPLOYMENT_FAILURE: + failureCount++ + } + } + + var changeFailureRate float32 + if successCount+failureCount != 0 { + changeFailureRate = float32(failureCount) / float32(successCount+failureCount) + } else { + changeFailureRate = 0 + } + + accumulatedTo := from.Unix() + for _, d := range deployments { + if d.CompletedAt > accumulatedTo { + accumulatedTo = d.CompletedAt + } + } + + return &insightstore.ChangeFailureRate{ + Timestamp: targetTimestamp, + Rate: changeFailureRate, + SuccessCount: successCount, + FailureCount: failureCount, + }, time.Unix(accumulatedTo, 0).UTC(), nil +} diff --git a/pkg/app/ops/insightcollector/collector_test.go b/pkg/app/ops/insightcollector/collector_test.go new file mode 100644 index 0000000000..69d99d9e06 --- /dev/null +++ b/pkg/app/ops/insightcollector/collector_test.go @@ -0,0 +1,1272 @@ +// Copyright 2020 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package insightcollector + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/pipe-cd/pipe/pkg/filestore/filestoretest" + + "go.uber.org/zap" + + "github.com/pipe-cd/pipe/pkg/datastore/datastoretest" + + "github.com/golang/mock/gomock" + + "github.com/pipe-cd/pipe/pkg/model" + + "github.com/pipe-cd/pipe/pkg/datastore" + "github.com/pipe-cd/pipe/pkg/insightstore" +) + +func TestInsightCollector_getDailyInsightData(t *testing.T) { + type args struct { + projectID string + appID string + kind model.InsightMetricsKind + rangeFrom time.Time + rangeTo time.Time + } + tests := []struct { + name string + prepareMockDataStoreFn func(m *datastoretest.MockDeploymentStore) + args args + want []insightstore.DataPoint + wantAccumulatedTo int64 + wantErr bool + }{ + { + name: "Deploy Frequency / DAILY", + prepareMockDataStoreFn: func(m *datastoretest.MockDeploymentStore) { + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 11, 4, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{ + { + Id: "1", + CreatedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "2", + CreatedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "3", + CreatedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{ + { + Id: "1", + CreatedAt: time.Date(2020, 10, 12, 1, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "2", + CreatedAt: time.Date(2020, 10, 12, 1, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 12, 1, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{ + { + Id: "1", + CreatedAt: time.Date(2020, 10, 13, 1, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 13, 1, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 10, 15, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + }, + args: args{ + appID: "appID", + kind: model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, + rangeFrom: time.Date(2020, 10, 11, 4, 0, 0, 0, time.UTC), + rangeTo: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC), + }, + want: func() []insightstore.DataPoint { + daily := []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 3, + }, + { + Timestamp: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 2, + }, + { + Timestamp: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 1, + }, + } + dps, e := insightstore.ToDataPoints(daily) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + wantAccumulatedTo: time.Date(2020, 10, 13, 1, 0, 0, 0, time.UTC).Unix(), + wantErr: false, + }, + { + name: "Change Failure Rate/ DAILY", + prepareMockDataStoreFn: func(m *datastoretest.MockDeploymentStore) { + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 11, 4, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{ + { + Status: model.DeploymentStatus_DEPLOYMENT_FAILURE, + CompletedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Status: model.DeploymentStatus_DEPLOYMENT_FAILURE, + CompletedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 11, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{ + { + Status: model.DeploymentStatus_DEPLOYMENT_FAILURE, + CompletedAt: time.Date(2020, 10, 12, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 10, 12, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 10, 12, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 10, 12, 5, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 12, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{ + { + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 10, 13, 8, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 13, 8, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + m.EXPECT().ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: 50, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 10, 15, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "appID", + }, + }, + Orders: nil, + Cursor: "", + }).Return([]*model.Deployment{}, nil) + }, + args: args{ + appID: "appID", + kind: model.InsightMetricsKind_CHANGE_FAILURE_RATE, + rangeFrom: time.Date(2020, 10, 11, 4, 0, 0, 0, time.UTC), + rangeTo: time.Date(2020, 10, 14, 0, 0, 0, 0, time.UTC), + }, + want: func() []insightstore.DataPoint { + daily := []*insightstore.ChangeFailureRate{ + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.5, + SuccessCount: 2, + FailureCount: 2, + }, + { + Timestamp: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.25, + SuccessCount: 3, + FailureCount: 1, + }, + { + Timestamp: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0, + SuccessCount: 1, + FailureCount: 0, + }, + } + dps, e := insightstore.ToDataPoints(daily) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + wantAccumulatedTo: time.Date(2020, 10, 13, 8, 0, 0, 0, time.UTC).Unix(), + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mock := datastoretest.NewMockDeploymentStore(ctrl) + tt.prepareMockDataStoreFn(mock) + + a := &InsightCollector{ + applicationStore: nil, + deploymentStore: mock, + insightstore: insightstore.NewStore(filestoretest.NewMockStore(ctrl)), + logger: zap.NewNop(), + } + got, accumulatedTo, err := a.getDailyInsightData(context.Background(), tt.args.projectID, tt.args.appID, tt.args.kind, tt.args.rangeFrom, tt.args.rangeTo) + if (err != nil) != tt.wantErr { + if !tt.wantErr { + assert.NoError(t, err) + return + } + assert.Error(t, err, tt.wantErr) + return + } + assert.Equal(t, tt.want, got) + assert.Equal(t, tt.wantAccumulatedTo, accumulatedTo) + }) + } +} + +func TestGetInsightDataForDeployFrequency(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + PageSizeForListDeployments := 50 + tests := []struct { + name string + projectID string + applicationID string + targetRangeFrom time.Time + targetRangeTo time.Time + targetTimestamp int64 + deploymentStore datastore.DeploymentStore + dataPoint *insightstore.DeployFrequency + accumulateTo time.Time + wantErr bool + }{ + { + name: "valid with InsightStep_DAILY", + applicationID: "ApplicationId", + targetRangeFrom: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + targetRangeTo: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC), + targetTimestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + deploymentStore: func() datastore.DeploymentStore { + s := datastoretest.NewMockDeploymentStore(ctrl) + s.EXPECT(). + ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: PageSizeForListDeployments, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "ApplicationId", + }, + }, + }).Return([]*model.Deployment{ + { + Id: "id1", + CreatedAt: time.Date(2020, 1, 1, 2, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id2", + CreatedAt: time.Date(2020, 1, 1, 3, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id3", + CreatedAt: time.Date(2020, 1, 1, 6, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + return s + }(), + dataPoint: &insightstore.DeployFrequency{ + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 3, + }, + accumulateTo: time.Date(2020, 1, 1, 6, 0, 0, 0, time.UTC), + wantErr: false, + }, + { + name: "return error when something wrong happen on ListDeployments", + applicationID: "ApplicationId", + targetRangeFrom: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + targetRangeTo: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC), + deploymentStore: func() datastore.DeploymentStore { + s := datastoretest.NewMockDeploymentStore(ctrl) + s.EXPECT(). + ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: PageSizeForListDeployments, + Filters: []datastore.ListFilter{ + { + Field: "CreatedAt", + Operator: ">=", + Value: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CreatedAt", + Operator: "<", + Value: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "ApplicationId", + }, + }, + }).Return([]*model.Deployment{}, fmt.Errorf("something wrong happens in ListDeployments")) + return s + }(), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + i := &InsightCollector{ + deploymentStore: tt.deploymentStore, + logger: zap.NewNop(), + } + value, accumulatedTo, err := i.getInsightDataForDeployFrequency(ctx, tt.projectID, tt.applicationID, tt.targetTimestamp, tt.targetRangeFrom, tt.targetRangeTo) + assert.Equal(t, tt.wantErr, err != nil) + if err == nil { + assert.Equal(t, tt.dataPoint, value) + assert.Equal(t, tt.accumulateTo, accumulatedTo) + } + }) + } +} +func TestGetInsightDataForChangeFailureRate(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + PageSizeForListDeployments := 50 + tests := []struct { + name string + projectID string + applicationID string + targetRangeFrom time.Time + targetRangeTo time.Time + targetTimestamp int64 + deploymentStore datastore.DeploymentStore + dataPoint *insightstore.ChangeFailureRate + accumulatedTo time.Time + wantErr bool + }{ + { + name: "valid with InsightStep_DAILY and app id", + applicationID: "ApplicationId", + targetRangeFrom: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + targetRangeTo: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC), + targetTimestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + deploymentStore: func() datastore.DeploymentStore { + s := datastoretest.NewMockDeploymentStore(ctrl) + s.EXPECT(). + ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: PageSizeForListDeployments, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "ApplicationId", + }, + }, + }).Return([]*model.Deployment{ + { + Id: "id1", + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id2", + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 1, 1, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id3", + Status: model.DeploymentStatus_DEPLOYMENT_FAILURE, + CompletedAt: time.Date(2020, 1, 1, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id4", + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + + return s + }(), + dataPoint: &insightstore.ChangeFailureRate{ + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.25, + SuccessCount: 3, + FailureCount: 1, + }, + accumulatedTo: time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC), + wantErr: false, + }, + { + name: "valid with InsightStep_DAILY and projectId", + projectID: "ProjectId", + targetRangeFrom: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + targetRangeTo: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC), + targetTimestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + deploymentStore: func() datastore.DeploymentStore { + s := datastoretest.NewMockDeploymentStore(ctrl) + s.EXPECT(). + ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: PageSizeForListDeployments, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ProjectId", + Operator: "==", + Value: "ProjectId", + }, + }, + }).Return([]*model.Deployment{ + { + Id: "id1", + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id2", + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 1, 1, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id3", + Status: model.DeploymentStatus_DEPLOYMENT_FAILURE, + CompletedAt: time.Date(2020, 1, 1, 5, 0, 0, 0, time.UTC).Unix(), + }, + { + Id: "id4", + Status: model.DeploymentStatus_DEPLOYMENT_SUCCESS, + CompletedAt: time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC).Unix(), + }, + }, nil) + + return s + }(), + dataPoint: &insightstore.ChangeFailureRate{ + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.25, + SuccessCount: 3, + FailureCount: 1, + }, + accumulatedTo: time.Date(2020, 1, 1, 8, 0, 0, 0, time.UTC), + wantErr: false, + }, + { + name: "return error when something wrong happen on ListDeployments", + applicationID: "ApplicationId", + targetRangeFrom: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + targetRangeTo: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC), + deploymentStore: func() datastore.DeploymentStore { + s := datastoretest.NewMockDeploymentStore(ctrl) + s.EXPECT(). + ListDeployments(gomock.Any(), datastore.ListOptions{ + PageSize: PageSizeForListDeployments, + Filters: []datastore.ListFilter{ + { + Field: "CompletedAt", + Operator: ">=", + Value: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "CompletedAt", + Operator: "<", + Value: time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + Field: "ApplicationId", + Operator: "==", + Value: "ApplicationId", + }, + }, + }).Return([]*model.Deployment{}, fmt.Errorf("something wrong happens in ListDeployments")) + return s + }(), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + i := &InsightCollector{ + deploymentStore: tt.deploymentStore, + logger: zap.NewNop(), + } + value, accumulatedTo, err := i.getInsightDataForChangeFailureRate(ctx, tt.projectID, tt.applicationID, tt.targetTimestamp, tt.targetRangeFrom, tt.targetRangeTo) + assert.Equal(t, tt.wantErr, err != nil) + if err == nil { + assert.Equal(t, tt.dataPoint, value) + assert.Equal(t, tt.accumulatedTo, accumulatedTo) + } + }) + } +} + +func TestInsightCollector_updateDataPoints(t *testing.T) { + type args struct { + chunk insightstore.Chunk + step model.InsightStep + updatedps []insightstore.DataPoint + accumulatedTo int64 + } + tests := []struct { + name string + args args + want insightstore.Chunk + wantErr bool + }{ + { + name: "success with daily and deploy frequency", + args: args{ + chunk: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 10, 11, 1, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Daily: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 10, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 10, + }, + }, + Weekly: nil, + Monthly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + step: model.InsightStep_DAILY, + updatedps: func() []insightstore.DataPoint { + daily := []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 3, + }, + { + Timestamp: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 2, + }, + { + Timestamp: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 1, + }, + } + dps, e := insightstore.ToDataPoints(daily) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + accumulatedTo: time.Date(2020, 10, 13, 1, 0, 0, 0, time.UTC).Unix(), + }, + want: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 10, 13, 1, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Daily: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 10, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 10, + }, + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 3, + }, + { + Timestamp: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 2, + }, + { + Timestamp: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 1, + }, + }, + Weekly: nil, + Monthly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + }, + { + name: "success with weekly and deploy frequency", + args: args{ + chunk: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 10, 11, 1, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Weekly: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 4, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 10, + }, + }, + Daily: nil, + Monthly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + step: model.InsightStep_WEEKLY, + updatedps: func() []insightstore.DataPoint { + df := []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 7, + }, + } + dps, e := insightstore.ToDataPoints(df) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + accumulatedTo: time.Date(2020, 10, 13, 3, 0, 0, 0, time.UTC).Unix(), + }, + want: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 10, 13, 3, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Weekly: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 4, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 10, + }, + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 7, + }, + }, + Daily: nil, + Monthly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + }, + { + name: "success with monthly and deploy frequency", + args: args{ + chunk: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 10, 11, 1, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Monthly: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 10, + }, + }, + Daily: nil, + Weekly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + step: model.InsightStep_MONTHLY, + updatedps: func() []insightstore.DataPoint { + df := []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 3, + }, + { + Timestamp: time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 7, + }, + } + dps, e := insightstore.ToDataPoints(df) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + accumulatedTo: time.Date(2020, 11, 13, 3, 0, 0, 0, time.UTC).Unix(), + }, + want: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 11, 13, 3, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Monthly: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 10, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 13, + }, + { + Timestamp: time.Date(2020, 11, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 7, + }, + }, + Daily: nil, + Weekly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + }, + { + name: "success with yearly and deploy frequency", + args: args{ + chunk: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2020, 10, 11, 1, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Yearly: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 10, + }, + }, + Daily: nil, + Weekly: nil, + Monthly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + step: model.InsightStep_YEARLY, + updatedps: func() []insightstore.DataPoint { + df := []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 3, + }, + { + Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 7, + }, + } + dps, e := insightstore.ToDataPoints(df) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + accumulatedTo: time.Date(2021, 1, 13, 3, 0, 0, 0, time.UTC).Unix(), + }, + want: func() insightstore.Chunk { + df := &insightstore.DeployFrequencyChunk{ + AccumulatedTo: time.Date(2021, 1, 13, 3, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.DeployFrequencyDataPoint{ + Yearly: []*insightstore.DeployFrequency{ + { + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 13, + }, + { + Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + DeployCount: 7, + }, + }, + Daily: nil, + Weekly: nil, + Monthly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + }, + { + name: "success with daily and change failure rate", + args: args{ + chunk: func() insightstore.Chunk { + df := &insightstore.ChangeFailureRateChunk{ + AccumulatedTo: time.Date(2020, 10, 11, 1, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.ChangeFailureRateDataPoint{ + Daily: []*insightstore.ChangeFailureRate{ + { + Timestamp: time.Date(2020, 10, 10, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0, + SuccessCount: 10, + FailureCount: 0, + }, + }, + Weekly: nil, + Monthly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + step: model.InsightStep_DAILY, + updatedps: func() []insightstore.DataPoint { + daily := []*insightstore.ChangeFailureRate{ + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.5, + SuccessCount: 2, + FailureCount: 2, + }, + { + Timestamp: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.25, + SuccessCount: 3, + FailureCount: 1, + }, + { + Timestamp: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0, + SuccessCount: 1, + FailureCount: 0, + }, + } + dps, e := insightstore.ToDataPoints(daily) + if e != nil { + t.Fatalf("error when convert to data points: %v", e) + } + return dps + }(), + accumulatedTo: time.Date(2020, 10, 13, 8, 0, 0, 0, time.UTC).Unix(), + }, + want: func() insightstore.Chunk { + df := &insightstore.ChangeFailureRateChunk{ + AccumulatedTo: time.Date(2020, 10, 13, 8, 0, 0, 0, time.UTC).Unix(), + DataPoints: insightstore.ChangeFailureRateDataPoint{ + Daily: []*insightstore.ChangeFailureRate{ + { + Timestamp: time.Date(2020, 10, 10, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0, + SuccessCount: 10, + FailureCount: 0, + }, + { + Timestamp: time.Date(2020, 10, 11, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.5, + SuccessCount: 2, + FailureCount: 2, + }, + { + Timestamp: time.Date(2020, 10, 12, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0.25, + SuccessCount: 3, + FailureCount: 1, + }, + { + Timestamp: time.Date(2020, 10, 13, 0, 0, 0, 0, time.UTC).Unix(), + Rate: 0, + SuccessCount: 1, + FailureCount: 0, + }, + }, + Weekly: nil, + Monthly: nil, + Yearly: nil, + }, + FilePath: "", + } + c, e := insightstore.ToChunk(df) + if e != nil { + t.Fatalf("error when convert to chunk: %v", e) + } + return c + }(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + a := &InsightCollector{} + got, err := a.updateDataPoints(tt.args.chunk, tt.args.step, tt.args.updatedps, tt.args.accumulatedTo) + if (err != nil) != tt.wantErr { + if !tt.wantErr { + assert.NoError(t, err) + return + } + assert.Error(t, err, tt.wantErr) + return + } + + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/config/control_plane.go b/pkg/config/control_plane.go index f02ab09559..fdfbab4ccf 100644 --- a/pkg/config/control_plane.go +++ b/pkg/config/control_plane.go @@ -37,6 +37,8 @@ type ControlPlaneSpec struct { Filestore ControlPlaneFileStore `json:"filestore"` // The configuration of cache for control plane. Cache ControlPlaneCache `json:"cache"` + // The configuration of insight collector. + InsightCollector ControlPlaneInsightCollector `json:"insightCollector"` // List of debugging/quickstart projects defined in Control Plane configuration. // Please note that do not use this to configure the projects running in the production. Projects []ControlPlaneProject `json:"projects"` @@ -183,6 +185,10 @@ type ControlPlaneCache struct { TTL Duration `json:"ttl"` } +type ControlPlaneInsightCollector struct { + Schedule string `json:"schedule"` +} + func (c ControlPlaneCache) TTLDuration() time.Duration { const defaultTTL = 5 * time.Minute diff --git a/pkg/config/control_plane_test.go b/pkg/config/control_plane_test.go index 3eed5ee6e6..d933bb1828 100644 --- a/pkg/config/control_plane_test.go +++ b/pkg/config/control_plane_test.go @@ -80,6 +80,9 @@ func TestControlPlaneConfig(t *testing.T) { Cache: ControlPlaneCache{ TTL: Duration(5 * time.Minute), }, + InsightCollector: ControlPlaneInsightCollector{ + Schedule: "0 0 * * *", + }, }, }, } diff --git a/pkg/config/testdata/control-plane/control-plane-config.yaml b/pkg/config/testdata/control-plane/control-plane-config.yaml index c864208b1c..164782ac9e 100644 --- a/pkg/config/testdata/control-plane/control-plane-config.yaml +++ b/pkg/config/testdata/control-plane/control-plane-config.yaml @@ -32,3 +32,6 @@ spec: cache: ttl: 5m + + insightCollector: + schedule: "0 0 * * *" diff --git a/pkg/insightstore/BUILD.bazel b/pkg/insightstore/BUILD.bazel index 934c442be2..5a57b108e7 100644 --- a/pkg/insightstore/BUILD.bazel +++ b/pkg/insightstore/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "chunk.go", "filestore.go", "filepath.go", + "datapoint.go" ], importpath = "github.com/pipe-cd/pipe/pkg/insightstore", visibility = ["//visibility:public"], @@ -23,6 +24,7 @@ go_test( srcs = [ "filestore_test.go", "chunk_test.go", + "datapoint_test.go", "filepath_test.go" ], embed = [":go_default_library"], diff --git a/pkg/insightstore/chunk.go b/pkg/insightstore/chunk.go index a91c8cfdcf..17da5f5d55 100644 --- a/pkg/insightstore/chunk.go +++ b/pkg/insightstore/chunk.go @@ -31,16 +31,10 @@ type DeployFrequencyChunk struct { } type DeployFrequencyDataPoint struct { - Daily []DeployFrequency `json:"daily"` - Weekly []DeployFrequency `json:"weekly"` - Monthly []DeployFrequency `json:"monthly"` - Yearly []DeployFrequency `json:"yearly"` -} - -// DeployFrequency represents a data point that shows the deployment frequency metrics. -type DeployFrequency struct { - Timestamp int64 `json:"timestamp"` - DeployCount float32 `json:"deploy_count"` + Daily []*DeployFrequency `json:"daily"` + Weekly []*DeployFrequency `json:"weekly"` + Monthly []*DeployFrequency `json:"monthly"` + Yearly []*DeployFrequency `json:"yearly"` } func (c *DeployFrequencyChunk) GetFilePath() string { @@ -59,40 +53,38 @@ func (c *DeployFrequencyChunk) SetAccumulatedTo(a int64) { c.AccumulatedTo = a } -func (c *DeployFrequencyChunk) DataCount(step model.InsightStep) int { +func (c *DeployFrequencyChunk) GetDataPoints(step model.InsightStep) ([]DataPoint, error) { switch step { case model.InsightStep_YEARLY: - return len(c.DataPoints.Yearly) + return ToDataPoints(c.DataPoints.Yearly) case model.InsightStep_MONTHLY: - return len(c.DataPoints.Monthly) + return ToDataPoints(c.DataPoints.Monthly) case model.InsightStep_WEEKLY: - return len(c.DataPoints.Weekly) + return ToDataPoints(c.DataPoints.Weekly) case model.InsightStep_DAILY: - return len(c.DataPoints.Daily) + return ToDataPoints(c.DataPoints.Daily) } - return 0 + return nil, fmt.Errorf("invalid step: %v", step) } -func (c *DeployFrequencyChunk) GetDataPoints(step model.InsightStep) ([]DataPoint, error) { +func (c *DeployFrequencyChunk) SetDataPoints(step model.InsightStep, points []DataPoint) error { + dfs := make([]*DeployFrequency, len(points)) + for i, p := range points { + dfs[i] = p.(*DeployFrequency) + } switch step { case model.InsightStep_YEARLY: - return toDataPoints(c.DataPoints.Yearly) + c.DataPoints.Yearly = dfs case model.InsightStep_MONTHLY: - return toDataPoints(c.DataPoints.Monthly) + c.DataPoints.Monthly = dfs case model.InsightStep_WEEKLY: - return toDataPoints(c.DataPoints.Weekly) + c.DataPoints.Weekly = dfs case model.InsightStep_DAILY: - return toDataPoints(c.DataPoints.Daily) + c.DataPoints.Daily = dfs + default: + return fmt.Errorf("invalid step: %v", step) } - return nil, fmt.Errorf("invalid step: %v", step) -} - -func (d DeployFrequency) GetTimestamp() int64 { - return d.Timestamp -} - -func (d DeployFrequency) Value() float32 { - return d.DeployCount + return nil } // change failure rate @@ -105,18 +97,10 @@ type ChangeFailureRateChunk struct { } type ChangeFailureRateDataPoint struct { - Daily []ChangeFailureRate `json:"daily"` - Weekly []ChangeFailureRate `json:"weekly"` - Monthly []ChangeFailureRate `json:"monthly"` - Yearly []ChangeFailureRate `json:"yearly"` -} - -// ChangeFailureRate represents a data point that shows the change failure rate metrics. -type ChangeFailureRate struct { - Timestamp int64 `json:"timestamp"` - Rate float32 `json:"rate"` - SuccessCount int64 `json:"success_count"` - FailureCount int64 `json:"failure_count"` + Daily []*ChangeFailureRate `json:"daily"` + Weekly []*ChangeFailureRate `json:"weekly"` + Monthly []*ChangeFailureRate `json:"monthly"` + Yearly []*ChangeFailureRate `json:"yearly"` } func (c *ChangeFailureRateChunk) GetFilePath() string { @@ -138,37 +122,35 @@ func (c *ChangeFailureRateChunk) SetAccumulatedTo(a int64) { func (c *ChangeFailureRateChunk) GetDataPoints(step model.InsightStep) ([]DataPoint, error) { switch step { case model.InsightStep_YEARLY: - return toDataPoints(c.DataPoints.Yearly) + return ToDataPoints(c.DataPoints.Yearly) case model.InsightStep_MONTHLY: - return toDataPoints(c.DataPoints.Monthly) + return ToDataPoints(c.DataPoints.Monthly) case model.InsightStep_WEEKLY: - return toDataPoints(c.DataPoints.Weekly) + return ToDataPoints(c.DataPoints.Weekly) case model.InsightStep_DAILY: - return toDataPoints(c.DataPoints.Daily) + return ToDataPoints(c.DataPoints.Daily) } return nil, fmt.Errorf("invalid step: %v", step) } -func (c *ChangeFailureRateChunk) DataCount(step model.InsightStep) int { +func (c *ChangeFailureRateChunk) SetDataPoints(step model.InsightStep, points []DataPoint) error { + cfs := make([]*ChangeFailureRate, len(points)) + for i, p := range points { + cfs[i] = p.(*ChangeFailureRate) + } switch step { case model.InsightStep_YEARLY: - return len(c.DataPoints.Yearly) + c.DataPoints.Yearly = cfs case model.InsightStep_MONTHLY: - return len(c.DataPoints.Monthly) + c.DataPoints.Monthly = cfs case model.InsightStep_WEEKLY: - return len(c.DataPoints.Weekly) + c.DataPoints.Weekly = cfs case model.InsightStep_DAILY: - return len(c.DataPoints.Daily) + c.DataPoints.Daily = cfs + default: + return fmt.Errorf("invalid step: %v", step) } - return 0 -} - -func (c ChangeFailureRate) GetTimestamp() int64 { - return c.Timestamp -} - -func (c ChangeFailureRate) Value() float32 { - return c.Rate + return nil } type Chunk interface { @@ -180,14 +162,41 @@ type Chunk interface { GetAccumulatedTo() int64 // SetAccumulatedTo sets AccumulatedTo SetAccumulatedTo(a int64) - // GetDataPoint gets list of data points of specify step + // GetDataPoints gets list of data points of specify step GetDataPoints(step model.InsightStep) ([]DataPoint, error) - // DataCount returns count of data in specify step - DataCount(step model.InsightStep) int + // SetDataPoints sets list of data points of specify step + SetDataPoints(step model.InsightStep, points []DataPoint) error +} + +func NewChunk(projectID string, metricsKind model.InsightMetricsKind, step model.InsightStep, appID string, timestamp time.Time) Chunk { + var path string + switch step { + case model.InsightStep_YEARLY: + path = makeYearsFilePath(projectID, metricsKind, appID) + default: + month := determineChunkKeys(step, timestamp, 1) + path = makeChunkFilePath(projectID, metricsKind, appID, month[0]) + } + + var chunk Chunk + switch metricsKind { + case model.InsightMetricsKind_DEPLOYMENT_FREQUENCY: + chunk = &DeployFrequencyChunk{ + FilePath: path, + } + case model.InsightMetricsKind_CHANGE_FAILURE_RATE: + chunk = &ChangeFailureRateChunk{ + FilePath: path, + } + default: + return nil + } + + return chunk } // convert types to Chunk. -func toChunk(i interface{}) (Chunk, error) { +func ToChunk(i interface{}) (Chunk, error) { switch p := i.(type) { case *DeployFrequencyChunk: return p, nil @@ -198,33 +207,6 @@ func toChunk(i interface{}) (Chunk, error) { } } -type DataPoint interface { - // Value gets data for model.InsightDataPoint - Value() float32 - // Timestamp gets timestamp - GetTimestamp() int64 -} - -// convert types to list of DataPoint. -func toDataPoints(i interface{}) ([]DataPoint, error) { - switch dps := i.(type) { - case []DeployFrequency: - dataPoints := make([]DataPoint, len(dps)) - for j, dp := range dps { - dataPoints[j] = dp - } - return dataPoints, nil - case []ChangeFailureRate: - dataPoints := make([]DataPoint, len(dps)) - for j, dp := range dps { - dataPoints[j] = dp - } - return dataPoints, nil - default: - return nil, fmt.Errorf("cannot convert to DataPoints: %v", dps) - } -} - type Chunks []Chunk func (cs Chunks) ExtractDataPoints(step model.InsightStep, from time.Time, count int) ([]*model.InsightDataPoint, error) { @@ -242,7 +224,12 @@ func (cs Chunks) ExtractDataPoints(step model.InsightStep, from time.Time, count } for _, c := range cs { - idp, err := extractDataPoints(c, step, from, to) + dp, err := c.GetDataPoints(step) + if err != nil { + return nil, err + } + + idp, err := extractDataPoints(dp, from, to) if err != nil { return nil, err } @@ -252,26 +239,3 @@ func (cs Chunks) ExtractDataPoints(step model.InsightStep, from time.Time, count return out, nil } - -func extractDataPoints(chunk Chunk, step model.InsightStep, from, to time.Time) ([]*model.InsightDataPoint, error) { - target, err := chunk.GetDataPoints(step) - if err != nil { - return nil, err - } - - var result []*model.InsightDataPoint - for _, d := range target { - ts := d.GetTimestamp() - if ts > to.Unix() { - break - } - - if ts >= from.Unix() { - result = append(result, &model.InsightDataPoint{ - Timestamp: ts, - Value: d.Value(), - }) - } - } - return result, nil -} diff --git a/pkg/insightstore/chunk_test.go b/pkg/insightstore/chunk_test.go index 8306f854ba..b581334f3e 100644 --- a/pkg/insightstore/chunk_test.go +++ b/pkg/insightstore/chunk_test.go @@ -24,185 +24,7 @@ import ( "github.com/pipe-cd/pipe/pkg/model" ) -func Test_ExtractDataPoints(t *testing.T) { - type args struct { - chunk Chunk - from time.Time - to time.Time - step model.InsightStep - } - tests := []struct { - name string - args args - want []*model.InsightDataPoint - wantErr bool - }{ - { - name: "success with yearly", - args: args{ - chunk: func() Chunk { - path := makeYearsFilePath("projectID", model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, "appID") - expected := DeployFrequencyChunk{ - AccumulatedTo: 1609459200, - DataPoints: DeployFrequencyDataPoint{ - Yearly: []DeployFrequency{ - { - DeployCount: 1000, - Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), - }, - { - DeployCount: 3000, - Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), - }, - }, - }, - FilePath: path, - } - chunk, _ := toChunk(&expected) - return chunk - }(), - from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), - to: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), - step: model.InsightStep_YEARLY, - }, - want: []*model.InsightDataPoint{ - { - Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), - Value: 1000, - }, - { - Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), - Value: 3000, - }, - }, - }, - { - name: "success with monthly", - args: args{ - chunk: func() Chunk { - path := makeYearsFilePath("projectID", model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, "appID") - expected := DeployFrequencyChunk{ - AccumulatedTo: 1609459200, - DataPoints: DeployFrequencyDataPoint{ - Monthly: []DeployFrequency{ - { - DeployCount: 1000, - Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), - }, - }, - }, - FilePath: path, - } - chunk, _ := toChunk(&expected) - return chunk - }(), - from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), - to: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), - step: model.InsightStep_MONTHLY, - }, - want: []*model.InsightDataPoint{ - { - Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), - Value: 1000, - }, - }, - }, - { - name: "success with weekly", - args: args{ - chunk: func() Chunk { - path := makeYearsFilePath("projectID", model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, "appID") - expected := DeployFrequencyChunk{ - AccumulatedTo: 1609459200, - DataPoints: DeployFrequencyDataPoint{ - Weekly: []DeployFrequency{ - { - DeployCount: 1000, - Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), - }, - { - DeployCount: 3000, - Timestamp: time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC).Unix(), - }, - }, - }, - FilePath: path, - } - chunk, _ := toChunk(&expected) - return chunk - }(), - from: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), - to: time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC), - step: model.InsightStep_WEEKLY, - }, - want: []*model.InsightDataPoint{ - { - Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), - Value: 1000, - }, - { - Timestamp: time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC).Unix(), - Value: 3000, - }, - }, - }, - { - name: "success with daily", - args: args{ - chunk: func() Chunk { - path := makeYearsFilePath("projectID", model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, "appID") - expected := DeployFrequencyChunk{ - AccumulatedTo: 1609459200, - DataPoints: DeployFrequencyDataPoint{ - Daily: []DeployFrequency{ - { - DeployCount: 1000, - Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), - }, - { - DeployCount: 3000, - Timestamp: time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC).Unix(), - }, - }, - }, - FilePath: path, - } - chunk, _ := toChunk(&expected) - return chunk - }(), - from: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), - to: time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC), - step: model.InsightStep_DAILY, - }, - want: []*model.InsightDataPoint{ - { - Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), - Value: 1000, - }, - { - Timestamp: time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC).Unix(), - Value: 3000, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := extractDataPoints(tt.args.chunk, tt.args.step, tt.args.from, tt.args.to) - if (err != nil) != tt.wantErr { - if !tt.wantErr { - assert.NoError(t, err) - return - } - assert.Error(t, err, tt.wantErr) - return - } - assert.Equal(t, tt.want, got) - }) - } -} - -func TestChunksToDataPoints(t *testing.T) { +func TestExtractDataPoints(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -234,7 +56,7 @@ func TestChunksToDataPoints(t *testing.T) { expected1 := DeployFrequencyChunk{ AccumulatedTo: 1609459200, DataPoints: DeployFrequencyDataPoint{ - Daily: []DeployFrequency{ + Daily: []*DeployFrequency{ { DeployCount: 1000, Timestamp: 1612051200, @@ -243,12 +65,12 @@ func TestChunksToDataPoints(t *testing.T) { }, FilePath: path, } - chunk1, _ := toChunk(&expected1) + chunk1, _ := ToChunk(&expected1) path = makeChunkFilePath("projectID", model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, "appID", "2021-02") expected2 := DeployFrequencyChunk{ AccumulatedTo: 1612123592, DataPoints: DeployFrequencyDataPoint{ - Daily: []DeployFrequency{ + Daily: []*DeployFrequency{ { DeployCount: 3000, Timestamp: 1612137600, @@ -257,7 +79,7 @@ func TestChunksToDataPoints(t *testing.T) { }, FilePath: path, } - chunk2, _ := toChunk(&expected2) + chunk2, _ := ToChunk(&expected2) return []Chunk{chunk1, chunk2} }(), expected: []*model.InsightDataPoint{ diff --git a/pkg/insightstore/datapoint.go b/pkg/insightstore/datapoint.go new file mode 100644 index 0000000000..56608a8610 --- /dev/null +++ b/pkg/insightstore/datapoint.go @@ -0,0 +1,162 @@ +// Copyright 2020 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package insightstore + +import ( + "errors" + "fmt" + "time" + + "github.com/pipe-cd/pipe/pkg/model" +) + +var ( + ErrNotFound = errors.New("data point not found") +) + +// DeployFrequency represents a data point that shows the deployment frequency metrics. +type DeployFrequency struct { + Timestamp int64 `json:"timestamp"` + DeployCount float32 `json:"deploy_count"` +} + +func (d *DeployFrequency) GetTimestamp() int64 { + return d.Timestamp +} + +func (d *DeployFrequency) Value() float32 { + return d.DeployCount +} + +func (d *DeployFrequency) Merge(point DataPoint) error { + if point == nil { + return nil + } + + df, ok := point.(*DeployFrequency) + if !ok { + return fmt.Errorf("can not cast to DataPoint to DeployFrequency, %v", point) + } + + if df.Timestamp != d.Timestamp { + return fmt.Errorf("mismatch timestamp. want: %d, acutual: %d", d.Timestamp, df.Timestamp) + } + + d.DeployCount += df.DeployCount + return nil + +} + +// ChangeFailureRate represents a data point that shows the change failure rate metrics. +type ChangeFailureRate struct { + Timestamp int64 `json:"timestamp"` + Rate float32 `json:"rate"` + SuccessCount int64 `json:"success_count"` + FailureCount int64 `json:"failure_count"` +} + +func (c *ChangeFailureRate) GetTimestamp() int64 { + return c.Timestamp +} + +func (c *ChangeFailureRate) Value() float32 { + return c.Rate +} + +func (c *ChangeFailureRate) Merge(point DataPoint) error { + if point == nil { + return nil + } + + cfr, ok := point.(*ChangeFailureRate) + if !ok { + return fmt.Errorf("can not cast to DataPoint to ChangeFailureRate, %v", point) + } + + if cfr.Timestamp != c.Timestamp { + return fmt.Errorf("mismatch timestamp. want: %d, acutual: %d", c.Timestamp, cfr.Timestamp) + } + + c.FailureCount += cfr.FailureCount + c.SuccessCount += cfr.SuccessCount + c.Rate = float32(c.FailureCount) / float32(c.FailureCount+c.SuccessCount) + return nil +} + +type DataPoint interface { + // Value gets data for model.InsightDataPoint. + Value() float32 + // Timestamp gets timestamp. + GetTimestamp() int64 + // Merge merges other DataPoint. + Merge(point DataPoint) error +} + +// ToDataPoints converts a list of concrete points into the list of DataPoints +func ToDataPoints(i interface{}) ([]DataPoint, error) { + switch dps := i.(type) { + case []*DeployFrequency: + dataPoints := make([]DataPoint, len(dps)) + for j, dp := range dps { + dataPoints[j] = dp + } + return dataPoints, nil + case []*ChangeFailureRate: + dataPoints := make([]DataPoint, len(dps)) + for j, dp := range dps { + dataPoints[j] = dp + } + return dataPoints, nil + default: + return nil, fmt.Errorf("cannot convert to DataPoints: %v", dps) + } +} + +// UpdateDataPoint sets data point +func UpdateDataPoint(dp []DataPoint, point DataPoint, timestamp int64) ([]DataPoint, error) { + latestData := dp[len(dp)-1] + if timestamp < latestData.GetTimestamp() { + return nil, fmt.Errorf("invalid timestamp") + } + + if timestamp == latestData.GetTimestamp() { + err := latestData.Merge(point) + if err != nil { + return nil, err + } + dp[len(dp)-1] = latestData + } else { + dp = append(dp, point) + } + return dp, nil +} + +func extractDataPoints(dp []DataPoint, from, to time.Time) ([]*model.InsightDataPoint, error) { + var result []*model.InsightDataPoint + for _, d := range dp { + ts := d.GetTimestamp() + if ts > to.Unix() { + break + } + + if ts >= from.Unix() { + result = append(result, &model.InsightDataPoint{ + Timestamp: ts, + Value: d.Value(), + }) + } + } + return result, nil +} diff --git a/pkg/insightstore/datapoint_test.go b/pkg/insightstore/datapoint_test.go new file mode 100644 index 0000000000..aec38eeaf2 --- /dev/null +++ b/pkg/insightstore/datapoint_test.go @@ -0,0 +1,169 @@ +// Copyright 2020 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package insightstore + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/pipe-cd/pipe/pkg/model" +) + +func Test_ExtractDataPoints(t *testing.T) { + type args struct { + datapoints []DataPoint + from time.Time + to time.Time + } + tests := []struct { + name string + args args + want []*model.InsightDataPoint + wantErr bool + }{ + { + name: "success with yearly", + args: args{ + datapoints: func() []DataPoint { + df := []*DeployFrequency{ + { + DeployCount: 1000, + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + DeployCount: 3000, + Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + } + dp, _ := ToDataPoints(df) + return dp + }(), + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + }, + want: []*model.InsightDataPoint{ + { + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + Value: 1000, + }, + { + Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + Value: 3000, + }, + }, + }, + { + name: "success with monthly", + args: args{ + datapoints: func() []DataPoint { + df := []*DeployFrequency{ + { + DeployCount: 1000, + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + }, + } + dp, _ := ToDataPoints(df) + return dp + }(), + from: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + to: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC), + }, + want: []*model.InsightDataPoint{ + { + Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), + Value: 1000, + }, + }, + }, + { + name: "success with weekly", + args: args{ + datapoints: func() []DataPoint { + df := []*DeployFrequency{ + { + DeployCount: 1000, + Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + DeployCount: 3000, + Timestamp: time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC).Unix(), + }, + } + dp, _ := ToDataPoints(df) + return dp + }(), + from: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC), + }, + want: []*model.InsightDataPoint{ + { + Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), + Value: 1000, + }, + { + Timestamp: time.Date(2021, 1, 10, 0, 0, 0, 0, time.UTC).Unix(), + Value: 3000, + }, + }, + }, + { + name: "success with daily", + args: args{ + datapoints: func() []DataPoint { + df := []*DeployFrequency{ + { + DeployCount: 1000, + Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), + }, + { + DeployCount: 3000, + Timestamp: time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC).Unix(), + }, + } + dp, _ := ToDataPoints(df) + return dp + }(), + from: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC), + to: time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC), + }, + want: []*model.InsightDataPoint{ + { + Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), + Value: 1000, + }, + { + Timestamp: time.Date(2021, 1, 4, 0, 0, 0, 0, time.UTC).Unix(), + Value: 3000, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := extractDataPoints(tt.args.datapoints, tt.args.from, tt.args.to) + if (err != nil) != tt.wantErr { + if !tt.wantErr { + assert.NoError(t, err) + return + } + assert.Error(t, err, tt.wantErr) + return + } + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/insightstore/filestore.go b/pkg/insightstore/filestore.go index 1e48f21346..2c882e8cef 100644 --- a/pkg/insightstore/filestore.go +++ b/pkg/insightstore/filestore.go @@ -41,7 +41,7 @@ func (s *Store) LoadChunks( from time.Time, count int, ) ([]Chunk, error) { - from = normalizeTime(from, step) + from = NormalizeTime(from, step) paths := determineFilePaths(projectID, appID, kind, step, from, count) var chunks []Chunk for _, p := range paths { @@ -88,7 +88,7 @@ func (s *Store) getChunk(ctx context.Context, path string, kind model.InsightMet if err != nil { return nil, err } - chunk, err := toChunk(c) + chunk, err := ToChunk(c) if err != nil { return nil, err } @@ -97,7 +97,7 @@ func (s *Store) getChunk(ctx context.Context, path string, kind model.InsightMet return chunk, nil } -func normalizeTime(from time.Time, step model.InsightStep) time.Time { +func NormalizeTime(from time.Time, step model.InsightStep) time.Time { var formattedTime time.Time switch step { case model.InsightStep_DAILY: diff --git a/pkg/insightstore/filestore_test.go b/pkg/insightstore/filestore_test.go index c60ce3b425..d8e6308209 100644 --- a/pkg/insightstore/filestore_test.go +++ b/pkg/insightstore/filestore_test.go @@ -83,7 +83,7 @@ func TestGetChunks(t *testing.T) { expected1 := DeployFrequencyChunk{ AccumulatedTo: 1612051200, DataPoints: DeployFrequencyDataPoint{ - Daily: []DeployFrequency{ + Daily: []*DeployFrequency{ { DeployCount: 1000, Timestamp: time.Date(2021, 1, 31, 0, 0, 0, 0, time.UTC).Unix(), @@ -92,12 +92,12 @@ func TestGetChunks(t *testing.T) { }, FilePath: path, } - chunk1, _ := toChunk(&expected1) + chunk1, _ := ToChunk(&expected1) path = makeChunkFilePath("projectID", model.InsightMetricsKind_DEPLOYMENT_FREQUENCY, "appID", "2021-02") expected2 := DeployFrequencyChunk{ AccumulatedTo: 1612137600, DataPoints: DeployFrequencyDataPoint{ - Daily: []DeployFrequency{ + Daily: []*DeployFrequency{ { DeployCount: 1000, Timestamp: time.Date(2021, 2, 1, 0, 0, 0, 0, time.UTC).Unix(), @@ -106,7 +106,7 @@ func TestGetChunks(t *testing.T) { }, FilePath: path, } - chunk2, _ := toChunk(&expected2) + chunk2, _ := ToChunk(&expected2) return []Chunk{chunk1, chunk2} }(), }, @@ -202,7 +202,7 @@ func TestGetChunk(t *testing.T) { expected := DeployFrequencyChunk{ AccumulatedTo: 1609459200, DataPoints: DeployFrequencyDataPoint{ - Yearly: []DeployFrequency{ + Yearly: []*DeployFrequency{ { DeployCount: 1000, Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), @@ -215,7 +215,7 @@ func TestGetChunk(t *testing.T) { }, FilePath: path, } - chunk, _ := toChunk(&expected) + chunk, _ := ToChunk(&expected) return chunk }(), }, @@ -243,7 +243,7 @@ func TestGetChunk(t *testing.T) { expected := DeployFrequencyChunk{ AccumulatedTo: 1609459200, DataPoints: DeployFrequencyDataPoint{ - Monthly: []DeployFrequency{ + Monthly: []*DeployFrequency{ { DeployCount: 1000, Timestamp: time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).Unix(), @@ -252,7 +252,7 @@ func TestGetChunk(t *testing.T) { }, FilePath: path, } - chunk, _ := toChunk(&expected) + chunk, _ := ToChunk(&expected) return chunk }(), }, @@ -284,7 +284,7 @@ func TestGetChunk(t *testing.T) { expected := DeployFrequencyChunk{ AccumulatedTo: 1609459200, DataPoints: DeployFrequencyDataPoint{ - Weekly: []DeployFrequency{ + Weekly: []*DeployFrequency{ { DeployCount: 1000, Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), @@ -297,7 +297,7 @@ func TestGetChunk(t *testing.T) { }, FilePath: path, } - chunk, _ := toChunk(&expected) + chunk, _ := ToChunk(&expected) return chunk }(), }, @@ -329,7 +329,7 @@ func TestGetChunk(t *testing.T) { expected := DeployFrequencyChunk{ AccumulatedTo: 1609459200, DataPoints: DeployFrequencyDataPoint{ - Daily: []DeployFrequency{ + Daily: []*DeployFrequency{ { DeployCount: 1000, Timestamp: time.Date(2021, 1, 3, 0, 0, 0, 0, time.UTC).Unix(), @@ -342,7 +342,7 @@ func TestGetChunk(t *testing.T) { }, FilePath: path, } - chunk, _ := toChunk(&expected) + chunk, _ := ToChunk(&expected) return chunk }(), }, @@ -379,7 +379,7 @@ func TestGetChunk(t *testing.T) { expected := ChangeFailureRateChunk{ AccumulatedTo: 1609459200, DataPoints: ChangeFailureRateDataPoint{ - Yearly: []ChangeFailureRate{ + Yearly: []*ChangeFailureRate{ { Rate: 0.75, SuccessCount: 1000, @@ -396,7 +396,7 @@ func TestGetChunk(t *testing.T) { }, FilePath: path, } - chunk, _ := toChunk(&expected) + chunk, _ := ToChunk(&expected) return chunk }(), }, @@ -474,7 +474,7 @@ func TestFormatFrom(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := normalizeTime(tt.args.from, tt.args.step) + got := NormalizeTime(tt.args.from, tt.args.step) assert.Equal(t, got, tt.want) }) }